访问文件

Akka Streams内置了 FileIO 工具库,可对文件以流的方式进行读、写。

FileIO提供了toPathfromPath两个函数。toPath是一个Sink,它接收上游流过来的元素(ByteString)并将其写入(追加)文件;fromPath是一个Source,它将按下游的(要求)从文件读取数据。

private val LINE_SEPARATOR = ByteString("\n")
private val file = Files.createTempFile("cookbook", "txt")
private val TAKE_SIZE = 100

FileIO.toPath 写数据到文件

这里构造了0到99(包含)个数字,并按一行一个的方式写入文件。

val f = Source
  .fromIterator(() => Iterator.from(0))
  .map(n => ByteString(n.toString))
  .take(TAKE_SIZE)
  .intersperse(ByteString.empty, LINE_SEPARATOR, LINE_SEPARATOR)
  .runWith(FileIO.toPath(file))
val ioResult = f.futureValue
ioResult.count should be > 0L

intersperse转换函数接受3个参数,分别在流开始前、每个元素后、流结束后添加一个指定值,就类型集合类型上的mkString(start: String, sep: String, end: String): String函数一样。这里通过此函数实现了在每写一个元素后将换行符也写入文件功能。

其实也可以在map(n => ByteString(n.toString))将数字转换成字符串时直接把换行符给附加上去,就像这样:map(n => ByteString(n.toString) ++ LINE_SEPARATOR)

Warning

需要注意Source.repeat(....).take(....)这里的take函数,在这个例子里是不可或缺的,若忘记限制repeat流的长度,则整个流将无限调用下去,直到写满你的磁盘。

当然,在这里会被futureValue的超时异常而终止测试的执行,最终很有可能不会写满你的磁盘。

FileIO.toPath在多个重载版本,在以未指定options参数时方式调用时,options的默认值为:Set(WRITE, TRUNCATE_EXISTING, CREATE),它以写入的方式打开文件,同时若文件已存在则清空内容,不存在则创建。toPath完整版本函数签名如下,startPosition指定了写入指针的初始偏移量(字节):

def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]]

FileIO.fromPath 从文件读数据

使用Framing.delimiter按指定的分隔标准从文件流读取数据(元素)。

val f = FileIO
  .fromPath(file)
  .via(Framing.delimiter(LINE_SEPARATOR, 8192))
  .filterNot(_.isEmpty)
  .map(bytes => bytes.utf8String.toInt)
  .runWith(Sink.seq)
val ints = f.futureValue
ints should be(0 until 100)

FileIO.fromPath也有重载版本,其完整版本函数签名如下:

def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]]
  • chunkSize:每次从文件里指定字节的块(缓冲)大小(字节)
  • startPosition:指定读取指针偏移量(字节)
Warning

使用Framing.delimiter从文件流里读取数据时需要注意一个问题,若文件不以你指定的分隔值结尾将会抛出异常:Caused by: akka.stream.scaladsl.Framing$FramingException: Stream finished but there was a truncated final frame in the buffer。当流读到文件末尾还未能找到指定的分隔值而不能结束分帧(framing)操作,而这时上游已经发送了完成( Finish )信号,而Framing还有未完成的buffer则会抛出此异常。

你将intersperse的第3个参数指定为ByteString.empty再次运行测试,就可重现这个问题。

在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交Pull Request。