深入 Source

Source

Source是一组流(Streams)处理步骤,有一个打开的输出端口。Source可以包含任意数量已连接的内部源(Source)和转换操作(Flow)。Source可以通过asPublisher函数转换为 Reactive Streams 协议等价的Publisher

final class Source[+Out, +Mat](
    override val traversalBuilder: LinearTraversalBuilder,
    override val shape: SourceShape[Out])
    extends FlowOpsMat[Out, Mat]
    with Graph[SourceShape[Out], Mat]

Source的类签名有两个类型参数,它们都是协变的。

  • Out:输出元素类型
  • Mat:物化值类型,物化值可用于记录Source的内部状态或操作记录等。比如:FileIO.fromPath这个Source的物化值记录了从文件里实际读取到的字符数。

Source通过类构造器实现了Graph接口的traversalBuildershape两个参数,其中shape限制了必须为一个SourceShape[Out]类型。

Source还实现了FlowOpsMat特质,使得Source具有了一系列的viato(及它们的变体)函数和丰富和流程转换函数(Flow操作符)。

  • via:用于连接Flow,它将一个流处理过程与当前Source连接,并返回另一个Source,其中Flow的输出端口将作为新Source的输出端口
  • viaMat:相对via多了第二个curry参数,combine指定保留哪边的物化值。via实际上相当于:viaMat(....)(Keep.left)

    def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(
          combine: (Mat, Mat2) => Mat3): Source[T, Mat3]
    
  • to:用于连接SinkSink将从上游发送的元素都聚合到一起并处理。当一个Source连接了Sink后,即形成了一个已闭合的可运行图( RunnableGraph ),我们可以调用RunnableGraphrun函数来实际运行它。
  • toMat:相对to多了第二个curry参数,combine指定保留哪边的物化值。to实际上相当于:toMat(....)(Keep.left)

    toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(
          combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3]
    

有关combine函数的更多内容请见: 物化与物化值#Keep

通常我们都不会直接构造Source,而是通过Source的伴身对象提供了各类工具函数来创建。有关Source伴身对象的常用工具函数请参阅: 创建 Source 的常用函数

SourceShape

final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {
  override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
  override val outlets: immutable.Seq[Outlet[_]] = out :: Nil

  override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy())
}

SourceShape使用了final做修饰,这就确定了Source的形状(shape)只能为SourceShape,而且限制为没有输入端口,只有一个输出端口的形状;唯一可定义的地方就是它的输出端口发送出数据的类型。

Source 怎么生产数据?

Source是怎么生产数据并发送到下游的呢?是在GraphStageLogic里调用push函数将数据推送到下游的。GraphStageLogic用于定义图处理实际逻辑,它需要通过GraphStageWithMaterializedValue抽像类提供的方法创建,而这人抽像类继承了Graph特质。

通常我们不会直接使用Graph来构建自己的图结构,而是会使用GraphStageWithMaterializedValue(或它的某个子类,接下来统称它们为 GraphStage )。 GraphStage 是一个可重复使用的流处理操作图(a reusable graph stream processing operator),常用的 GraphStage 有两个:

  • GraphStageWithMaterializedValue:有物化值的操作图,这样的图构造的 Source 签名类似:Source[Out, Mat]
  • GraphStage:不需要物化值的操作图,这样的图构造出的 Source 签名类似:Source[Out, NotUsed]
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M]
    extends Graph[S, M] {
  @throws(classOf[Exception])
  def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M)

  protected def initialAttributes: Attributes = Attributes.none
}

abstract class GraphStage[S <: Shape] 
    extends GraphStageWithMaterializedValue[S, NotUsed] {
  final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =
    (createLogic(inheritedAttributes), NotUsed)

  @throws(classOf[Exception])
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic
}

GraphStageWithMaterializedValue有一个抽像方法待实现,它返回图处理逻辑和物化值的元组,类型为:(GraphStageLogic, M)GraphStage继承了GraphStageWithMaterializedValue,它实现了createLogicAndMaterializedValue方法并将物化值固定为NotUsed,同时提供createLogic供实现类创建图处理逻辑。

回到GraphStageLogicpush函数,通过它将数据元素发送到指定的输出端口。通常我们可以在onPull事件响应函数里调用它,onPull函数将由下游通过pull函数触发。

final protected def push[T](out: Outlet[T], elem: T): Unit

向指定的输出端口发射数据元素,在pull事件到达之前调用此方法两次将失败,在任何时间只能有一个未完成的推送请求。方法isAvailable可用于检查输出端口是否已准备好被推送。

FileSource 实例讲解

一般不会通过new的方式直接创建一个Source出来,而是通过调用Source.fromGraph从一个预定义好的图创建,如:

def fromPath(
    f: Path, 
    chunkSize: Int, 
    startPosition: Long): Source[ByteString, Future[IOResult]] =
  Source
    .fromGraph(new FileSource(f, chunkSize, startPosition))
    .withAttributes(DefaultAttributes.fileSource)

这里通过Akka Streams自带的FileSource讲述Source图的定义过程。FileSource通过自定文件创建一个异步的文件读取源(Source)。

final case class IOResult(count: Long)

private[akka] final class FileSource(
      path: Path,
      chunkSize: Int,
      startPosition: Long)
    extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] {
  require(chunkSize > 0, "chunkSize must be greater than 0")
  val out = Outlet[ByteString]("FileSource.out")

  override val shape = SourceShape(out)
  // ....
}

FileSource要保存从文件实际读取字符数,所有它通过继承GraphStageWithMaterializedValue将计数通过物化值向下游传递。同时这个计数值不能阻塞整个流处理过程,所以物化值类型为:Future[IOResult]

Source[Out, Mat]的类型签名只有输出类型,也许你会奇怪它实际要处理的数据源来自哪里?看到这里FileSource(path: Path, ....)的构造函数签名即可明白,它实际要处理的数据源就是path指定的文件路径,通常在实现自己的Source时,我们都要继承Graph的某个抽像子类,再在主构造函数里传入它要处理的实际数据源。

Note

IOResultcount变量是文件读取的位置(字节),实际读取文件字节数需要通过count - startPosition来获得,因为有可能并不是从文件头开始读取。

createLogicAndMaterializedValue

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = {
    val ioResultPromise = Promise[IOResult]()
    val logic: GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
      val buffer = ByteBuffer.allocate(chunkSize)
      val maxReadAhead = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
      var channel: FileChannel = _
      var position = startPosition
      var eofEncountered = false
      var availableChunks: Vector[ByteString] = Vector.empty[ByteString]

      setHandler(out, this)
      // ....
    }
    (logic, ioResultPromise.future)
  }

变量logicioResultPromise,将作为createLogicAndMaterializedValue函数的返回值。login匿名实现了从path文件读取数据并发送到下游的功能逻辑,在图成功或失败完成时将position(读取到文件的位置偏移处(字节))

GraphStageLogic 详解

图属性变量

val buffer = ByteBuffer.allocate(chunkSize)
val maxReadAhead = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
var channel: FileChannel = _
var position = startPosition
var eofEncountered = false
var availableChunks: Vector[ByteString] = Vector.empty[ByteString]
  • buffer:每次从文件里读取数据块缓存,读取的数据块将追加到availableChunks
  • maxReadAheadavailableChunks长度,限制最多可向前读取的最大次数
  • channel:低层FileChannel
  • position:当前文件读取打针位置
  • eofEncoutered:是否读到文件尾
  • availableChunks:缓存的未处理数据块

图的初始化

  override def preStart(): Unit = {
    try {
      // this is a bit weird but required to keep existing semantics
      if (!Files.exists(path)) throw new NoSuchFileException(path.toString)

      require(!Files.isDirectory(path), s"Path '$path' is a directory")
      require(Files.isReadable(path), s"Missing read permission for '$path'")

      channel = FileChannel.open(path, StandardOpenOption.READ)
      channel.position(position)
    } catch {
      case ex: Exception =>
        ioResultPromise.trySuccess(IOResult(position, Failure(ex)))
        throw ex
    }
  }

校验path指定的文件是否存在、是否可读,并以指定的偏移量位置打开 FileChannel

onPull

  override def onPull(): Unit = {
    if (availableChunks.size < maxReadAhead && !eofEncountered)
      availableChunks = readAhead(maxReadAhead, availableChunks)
    //if already read something and try
    if (availableChunks.nonEmpty) {
      emitMultiple(out, availableChunks.iterator, () => if (eofEncountered) success() else setHandler(out, handler))
      availableChunks = Vector.empty[ByteString]
    } else if (eofEncountered) success()
  }
  1. 当收到下游拉取数据请求时,通过readAhead函数从channel中读取尽量多的数据块
  2. availableChunks不为空,通过emitMultiple函数高效的将多个可用数据块 push 到下游。
final protected def emitMultiple[T](out: Outlet[T], elems: Iterator[T], andThen: () => Unit): Unit

emitMultiple函数可以简化需要发射多个元素的工作,它将在多个元素发射完成后恢复原有的处理程序(OutHandler),这样就不需要自己手动管理多个元素的发射状态。

readAhead,读取数据

  private def success(): Unit = {
    completeStage()
    ioResultPromise.trySuccess(IOResult(position, Success(Done)))
  }
  /** BLOCKING I/O READ */
  @tailrec def readAhead(maxChunks: Int, chunks: Vector[ByteString]): Vector[ByteString] =
    if (chunks.size < maxChunks && !eofEncountered) {
      val readBytes = try channel.read(buffer, position)
      catch {
        case NonFatal(ex) =>
          failStage(ex)
          ioResultPromise.trySuccess(IOResult(position, Failure(ex)))
          throw ex
      }

      if (readBytes > 0) {
        buffer.flip()
        position += readBytes
        val newChunks = chunks :+ ByteString.fromByteBuffer(buffer)
        buffer.clear()

        if (readBytes < chunkSize) {
          eofEncountered = true
          newChunks
        } else readAhead(maxChunks, newChunks)
      } else {
        eofEncountered = true
        chunks
      }
    } else chunks

readAhead是一个尾递归函数,编译器在编译时会将其优化成循环调用,这样可避免栈溢出和性能问题。readAhead首先判断chunks是否未满或还有文件可读,若是则进行文件数据读取,否直接返回chunks。当channel.read返回的实际读出字节数readBytes大于0且不小于chunkSize,代表文件还有数据可继续读取,这时递归readAhead函数;否则设置 eofEncouteredtrue并返回newChunks

onDownstreamFinish

  override def onDownstreamFinish(cause: Throwable): Unit = {
    cause match {
      case _: SubscriptionWithCancelException.NonFailureCancellation =>
        success()
      case ex =>
        ioResultPromise.tryFailure(
          new IOOperationIncompleteException("Downstream failed before reaching file end", position, ex))
        completeStage()
    }
  }

当下游取消(cancel)时,会触发onDownstreamFinish函数,并通过cause参数告知下游取消时的异常,在此也完成当前Source

postStop

  override def postStop(): Unit = {
    ioResultPromise.trySuccess(IOResult(position, Success(Done)))
    if ((channel ne null) && channel.isOpen) channel.close()
  }

在图停止后做清理工作,关闭打开的文件。

小结

完整FileSource源码见:https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala

在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交Pull Request。