物化与物化值

Materializer(物化器)

物化器负责将流蓝图(RunnableGraph)转换成可运行的流(通过在内部创建actor来异步执行)。通常来说首选系统范围的物化器,不需要我们手动创建它。

有两种方式来选择系统范围的物化器:

  1. 通过隐式转换函数从ActorSystem获取,akka.actor.typed.ActorSystemakka.actor.ActorSystem都支持。

    implicit val system: ActorSystem[Nothing]
    
  2. 通过SystemMaterializerAkka扩展。

    // untyped ActorSystem
    SystemMaterializer(system).materializer
    // typed ActorSystem
    import akka.actor.typed.scaladsl.adapter._
    SystemMaterializer(system.toClassic).materializer 
    

Materialized value(物化值)

当一个可运行图(RunnableGraph)被执行(run),它将返回一个值(结果)。而这个值被称为 Materialized Value (物化值)。物化值可以是Sink处理后的结果,也可以是Source内部记录的统计数据……。

Source[OUT, Mat]
Flow[IN, OUT, Mat]
Sink[IN, Mat]

这里的OUTIN是在流处理过程中流通的每个数据元素的类型,整个流处结束后最终返回的结果是从SourceFlowSink各部分的Mat参数类型里选择的。SourceFlowSink每个的最后一个类型参数Mat就是物化值。带Mat后缀的操作函数提供了combine函数参数来选择要保留的物化值,如:viaMattoMat

Source的物化值一般用于记录数据源的内部状态;Flow的物化值通常都会将上游的物化值向下传递(Keep.left),也可以调用其它Keep函数里自定义向下传递的物化值;Sink的物化值通常用于汇总上游发送的数据。

def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(
      combine: (Mat, Mat2) => Mat3): Source[T, Mat3]
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(
      combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3]

Keep

object Keep定义了4个便捷函数来选择保留图执行过程的哪个物化值,通常我们通过选择其中一个来作为调用viaMattoMat函数时传递给combine参数的值。

object Keep {
  private val _left = (l: Any, _: Any) => l
  private val _right = (_: Any, r: Any) => r
  private val _both = (l: Any, r: Any) => (l, r)
  private val _none = (_: Any, _: Any) => NotUsed

  def left[L, R]: (L, R) => L = _left.asInstanceOf[(L, R) => L]
  def right[L, R]: (L, R) => R = _right.asInstanceOf[(L, R) => R]
  def both[L, R]: (L, R) => (L, R) = _both.asInstanceOf[(L, R) => (L, R)]
  def none[L, R]: (L, R) => NotUsed = _none.asInstanceOf[(L, R) => NotUsed]
}

下面代码,source的物化值保留了已读取文件的字符数,sink的物化值保存了实际入写文件的字节数。

val source: Source[ByteString, Future[IOResult]] =
  FileIO.fromPath(Paths.get("/tmp/file.txt"))
val sink: Sink[ByteString, Future[IOResult]] =
  FileIO.toPath(Paths.get("/tmp/file2.txt"))
val graph: Source[ByteString, Future[IOResult]] = source // (1)
  .via(Framing.delimiter(ByteString("\n"), 8192))
  .filterNot(_.isEmpty)

val readsF: Future[IOResult] = // (2)
  graph.toMat(sink)(Keep.left).run() // Same: graph.to(sink).run()
val writesF: Future[IOResult] = // (3)
  graph.toMat(sink)(Keep.right).run() // Same: graph.runWith(sink)
val (leftF, rightF) = graph.toMat(sink)(Keep.both).run() // (4)
val notUsed: NotUsed = graph.toMat(sink)(Keep.none).run() // (5)
  1. Source上的via和所有转换操作(不带Mat的)默认都会保留 左边 的物化值,这样source的物化值(记录从文件已读取字节数)就作为graph的物化值被传递;
  2. graph被调用方作为 左边 ,参数sink作为 右边Keep.left保留左边的物化值:记录从文件已读取字节数;
  3. graph被调用方作为 左边 ,参数sink作为 右边Keep.right保留右边的物化值:记录已写入文件字节数;
  4. 通过Keep.both同时保留两边的物化值,作为一个 tuple 被返回;
  5. 不保留任何物化值。只在不需要关心流的任务结束和状态时使用。
在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交Pull Request。