Streams 分组

grouped

通过grouped函数可将上游发送的元素按指定个数分组,这在很场景下可作为一个优化策略。比如:批量写入数据库。

"grouped" in {
  val list = Source.fromIterator(() => Iterator.from(0)).grouped(100).take(2).runWith(Sink.seq).futureValue
  list should be(Vector(0 until 100, 100 until 200))
}

groupedWithin

若上游长时间没有元素被发送,很有可能下游将被永久的挂在那里,形成一种假死的状态。这时可通过groupedWithin函数传递一个超时时间,它将在指定的分组数量或超时时间两者之一达到时形成一个分组并将批量数据传给下游。

"groupedWithin" in {
  val f = Source
    .fromIterator(() => Iterator.from(0))
    .throttle(5, 500.millis)
    .groupedWithin(100, 1.seconds)
    .take(2)
    .runWith(Sink.seq)
  val list = Await.result(f, 3.seconds)
  list should not be Vector(0 until 100, 100 until 200)
}
在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交Pull Request。