Source 数据流

Source Streaming

Akka HTTP supports completing a request with an Akka Source<T, ?>Source[T, _], which makes it possible to easily build and consume streaming end-to-end APIs which apply back pressure throughout the entire stack.

Akka HTTP 支持使用 Source<T, ?>Source[T, _] 完成请求,这使得轻松构建和使用流式端到端 API 成为可能,该 API 在整个栈中处处应用回压。

It is possible to complete requests with raw Source<ByteString, ?>Source[ByteString, _], however often it is more convenient to stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array, or CSV stream (where each element is followed by a newline).

可以使用原始的 Source<ByteString, ?>Source[ByteString, _] 来完成请求,然而更方便的是基于元素到元素上的流,并允许 Akka HTTP 在内部处理渲染 - 例如作为 JSON 数组或 CSV 流(其中每个元素后面跟着一个新行)。

In the following sections we investigate how to make use of the JSON Streaming infrastructure, however the general hints apply to any kind of element-by-element streaming you could imagine.

在下面部分中,我们研究如何利用 JSON 数据流基础设施,但是一般的提示适用于你可以想像的任何类型的元素到元素数据流。

JSON Streaming

JSON 数据流

JSON Streaming is a term referring to streaming a (possibly infinite) stream of element as independent JSON objects as a continuous HTTP request or response. The elements are most often separated using newlines, however do not have to be. Concatenating elements side-by-side or emitting “very long” JSON array is also another use case.

JSON 数据流 是一个术语,指将一个(可能无限)流的元素作为独立的 JSON 对象,作为持续的 HTTP 请求或响应数据流。 元素通常使用换行分割,但不必这样。并排连接元素或发出“很长的” JSON 数组也是另一个用例。

In the below examples, we’ll be referring to the Tweet case class as our model, which is defined as:

在下面的例子中,我们将引用 Tweet case 类为我们的模型,它的定义为:

Scala
case class Tweet(uid: Int, txt: String)
Java
private static final class JavaTweet {
  private int id;
  private String message;

  public JavaTweet(int id, String message) {
    this.id = id;
    this.message = message;
  }

  public int getId() {
    return id;
  }

  public void setId(int id) {
    this.id = id;
  }

  public void setMessage(String message) {
    this.message = message;
  }

  public String getMessage() {
    return message;
  }
}

And as always with spray-json, we provide our marshaller and unmarshaller instances as implicit values using the jsonFormat## method to generate them statically:

像往常使用 spary-json 一样,我们使用 jsonFormat## 方法将我们的编组和解组实例作为隐式值来静态生成:

Scala
object MyTweetJsonProtocol
  extends akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
  with spray.json.DefaultJsonProtocol {

  implicit val tweetFormat = jsonFormat2(Tweet.apply)
}

Responding with JSON Streams

使用 JSON 流回应

In this example we implement an API representing an infinite stream of tweets, very much like Twitter’s Streaming API.

在这个例子里,我们实现一个 API 来代表 tweets 的一个无限流,非常像 Twitter 的 流式 API

Firstly, we’ll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an Akka Streams Source<T, ?>Source[T, _]. One such trait, containing the needed marshallers is SprayJsonSupport, which uses spray-json (a high performance JSON parser library), and is shipped as part of Akka HTTP in the akka-http-spray-json module.

首先,我们将需要获得一些额外的编组基础设施,它能够编组/解组 Akka 流 Source<T, ?>Source[T, _] 。 其中一个包含所需编组器的 trait 是 SparyJsonSupport,它使用 spary-json(高性能 JSON 解析库), 并作为 Akka HTTP 的一部分在 akka-http-spary-json 模块里。

Once the general infrastructure is prepared, we import our model’s marshallers, generated by spray-json (Step 1) and enable JSON Streaming by making an implicit EntityStreamingSupportEntityStreamingSupport instance available (Step 2). Akka HTTP pre-packages JSON and CSV entity streaming support, however it is simple to add your own, in case you’d like to stream a different content type (for example plists or protobuf).

一旦普通基础设施已准备好,我们导入我们模型的编组,它由 spar-json 生成(步骤 1),并通过生成隐式 EntityStreamingSupportEntityStreamingSupport 实例来启用数据流(步骤 2)。 Akka HTTP 预先打包了 JSON 和 CSV 实体数据流支持,但是添加自己的数据流支持很简单,这种情况下你想流式处理不同的内容类型(例如:plists 或者 protobuf)。

Firstly, we’ll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an Akka Streams Source<T, ?>Source[T, ?]. Here we’ll use the Jackson helper class from akka-http-jackson (a separate library that you should add as a dependency if you want to use Jackson with Akka HTTP).

First we enable JSON Streaming by making an implicit EntityStreamingSupportEntityStreamingSupport instance available (Step 1).

The default mode of rendering a SourceSource is to represent it as an JSON Array. If you want to change this representation for example to use Twitter style new-line separated JSON objects, you can do so by configuring the support trait accordingly.

In Step 1.1. we demonstrate how to configure the rendering to be new-line separated, and also how parallel marshalling can be applied. We configure the Support object to render the JSON as series of new-line separated JSON objects, simply by appending a ByteString consisting of a single new-line character to each ByteString in the stream. Although this format is not valid JSON, it is pretty popular since parsing it is relatively simple - clients need only to find the new-lines and apply JSON unmarshalling for an entire line of JSON.

The final step is simply completing a request using a Source of tweets, as simple as that:

最后一步是使用 tweets 的 Source 来简单的完成请求,就像这么简单:

Scala
// [1] import "my protocol", for marshalling Tweet objects:
import MyTweetJsonProtocol._

// [2] pick a Source rendering support trait:
// Note that the default support renders the Source as JSON Array
implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

val route =
  path("tweets") {
    // [3] simply complete a request with a source of tweets:
    val tweets: Source[Tweet, NotUsed] = getTweets
    complete(tweets)
  }

// tests ------------------------------------------------------------
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`))

Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
  responseAs[String] shouldEqual
    """[""" +
    """{"txt":"#Akka rocks!","uid":1},""" +
    """{"txt":"Streaming is so hot right now!","uid":2},""" +
    """{"txt":"You cannot enter the same river twice.","uid":3}""" +
    """]"""
}

// endpoint can only marshal Json, so it will *reject* requests for application/xml:
Get("/tweets").withHeaders(AcceptXml) ~> route ~> check {
  handled should ===(false)
  rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`)))
}
Java
import static akka.http.javadsl.server.Directives.completeOKWithSource;
import static akka.http.javadsl.server.Directives.get;
import static akka.http.javadsl.server.Directives.parameter;
import static akka.http.javadsl.server.Directives.path;


// Step 1: Enable JSON streaming
// we're not using this in the example, but it's the simplest way to start:
// The default rendering is a JSON array: `[el, el, el , ...]`
final JsonEntityStreamingSupport jsonStreaming = EntityStreamingSupport.json();

// Step 1.1: Enable and customise how we'll render the JSON, as a compact array:
final ByteString start = ByteString.fromString("[");
final ByteString between = ByteString.fromString(",");
final ByteString end = ByteString.fromString("]");
final Flow<ByteString, ByteString, NotUsed> compactArrayRendering =
  Flow.of(ByteString.class).intersperse(start, between, end);

final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json()
  .withFramingRendererFlow(compactArrayRendering);


// Step 2: implement the route
final Route responseStreaming = path("tweets", () ->
  get(() ->
    parameter(StringUnmarshallers.INTEGER, "n", n -> {
      final Source<JavaTweet, NotUsed> tws =
        Source.repeat(new JavaTweet(12, "Hello World!")).take(n);

      // Step 3: call complete* with your source, marshaller, and stream rendering mode
      return completeOKWithSource(tws, Jackson.marshaller(), compactJsonSupport);
    })
  )
);
// tests:
final TestRoute routes = testRoute(tweets());

// test happy path
final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON));
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication))
  .assertStatusCode(200)
  .assertEntity("[{\"id\":12,\"message\":\"Hello World!\"},{\"id\":12,\"message\":\"Hello World!\"}]");

// test responses to potential errors
final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT);
routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText))
  .assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406
  .assertEntity("Resource representation is only available with these types:\napplication/json");
// tests --------------------------------------------
final TestRoute routes = testRoute(csvTweets());

// test happy path
final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV));
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv))
  .assertStatusCode(200)
  .assertEntity("12,Hello World!\n" +
    "12,Hello World!\n");

// test responses to potential errors
final Accept acceptText = Accept.create(MediaRanges.ALL_APPLICATION);
routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText))
  .assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406
  .assertEntity("Resource representation is only available with these types:\ntext/csv; charset=UTF-8");

The reason the EntityStreamingSupportEntityStreamingSupport has to be enabled explicitly is that one might want to configure how the stream should be rendered. We’ll discuss this in depth in the next section though.

必须明确启用 EntityStreamingSupportEntityStreamingSupport 的原因是,可能希望配置流怎样被渲染。我们将在下一节深入讨论这个话题。

Customising response rendering mode

定制响应渲染模式

Since it is not always possible to directly and confidently answer the question of how a stream of T should look on the wire, the EntityStreamingSupportEntityStreamingSupport traits come into play and allow fine-tuning the stream’s rendered representation.

因为不总是可能直接并有把握地回答如何看待线上 T 流的问题, EntityStreamingSupportEntityStreamingSupport trait 开始发挥作用,并且可以微调流的渲染表现。

For example, in case of JSON Streaming, there isn’t really one standard about rendering the response. Some APIs prefer to render multiple JSON objects in a line-by-line fashion (Twitter’s streaming APIs for example), while others simply return very large arrays, which could be streamed as well.

例如,在 JSON 数据流的情况中,关于渲染响应没有事实标准。有些 API 更喜欢一行一行的渲染多个 JSON 对象(例如 Twitter 的流式 API), 而另一些简单返回一个很大的数组,这些数组也可以流式处理。

Akka defaults to the second one (streaming a JSON Array), as it is correct JSON and clients not expecting a streaming API would still be able to consume it in a naive way if they’d want to.

Akka 默认第二种方式(流式的 JSON 数组),因为它是正确的 JSON,并且如果客户端希望的话,它们不假定流式 API,仍然能够以一种天真的方式使用它。

The line-by-line approach however is also pretty popular even though it is not valid JSON. Its simplicity for client-side parsing is a strong point in case to pick this format for your Streaming APIs. Below we demonstrate how to reconfigure the support trait to render the JSON line-by-line.

然而,按行处理的方法也非常流行,尽管它们是无效的 JSON。它对于客户端解析的简单性是为流式 API 挑选这种格式的一个优点。 下面我们演示怎样重新配置支持 trait 来按行渲染 JSON。

Scala
import MyTweetJsonProtocol._

// Configure the EntityStreamingSupport to render the elements as:
// {"example":42}
// {"example":43}
// ...
// {"example":1000}
val newline = ByteString("\n")

implicit val jsonStreamingSupport = EntityStreamingSupport.json()
  .withFramingRenderer(Flow[ByteString].map(bs => bs ++ newline))

val route =
  path("tweets") {
    // [3] simply complete a request with a source of tweets:
    val tweets: Source[Tweet, NotUsed] = getTweets
    complete(tweets)
  }

// tests ------------------------------------------------------------
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))

Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
  responseAs[String] shouldEqual
    """{"txt":"#Akka rocks!","uid":1}""" + "\n" +
    """{"txt":"Streaming is so hot right now!","uid":2}""" + "\n" +
    """{"txt":"You cannot enter the same river twice.","uid":3}""" + "\n"
}

Another interesting feature is parallel marshalling. Since marshalling can potentially take much time, it is possible to marshal multiple elements of the stream in parallel. This is simply a configuration option on EntityStreamingSupportEntityStreamingSupport and is configurable like this:

另一个有趣的特性是并行编组。因为编组有可能花很多时间,所以可以并行编组流的多个元素。这是 EntityStreamingSupportEntityStreamingSupport 的一个简单配置选项,配置像这样:

Scala
import MyTweetJsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
  EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

path("tweets") {
  val tweets: Source[Tweet, NotUsed] = getTweets
  complete(tweets)
}

The above shown mode preserves ordering of the Source’s elements, which may sometimes be a required property, for example when streaming a strictly ordered dataset. Sometimes the concept of strict order does not apply to the data being streamed, though, which allows us to exploit this property and use an unordered rendering.

上面显示的模式保持 Source 的元素的顺序,这有时是必须的属性,比如当流处理一个严格有序的数据库。有时严格有序的概念不适用于正在流式传输的数据, 这时,允许我们利用这个属性并使用 无序 渲染。

This unordered rendering can be enabled via a configuration option as shown below. Effectively, this allows Akka HTTP’s marshalling infrastructure to concurrently marshall up to as many elements as defined in parallelism and emit the first one which is marshalled into the HttpResponseHttpResponse:

无序 渲染可以通过下面所示的配置选项启用。实际上,这允许 Akka HTTP 的编组基础设施并发的编组直到 parallelism 定义的尽可能多的元素,并(同时)发出第一个被编组的元素到 HttpResponseHttpResponse

Scala
import MyTweetJsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
  EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = true)

path("tweets" / "unordered") {
  val tweets: Source[Tweet, NotUsed] = getTweets
  complete(tweets)
}

This allows us to potentially render elements faster into the HttpResponse, since it can avoid “head of line blocking”, in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall.

这允许我们 潜在地 更快的将元素渲染进 HttpResponse 中,因它这可以避免“行首阻塞”,以防止流前的元素花很长时间编组,而后面的其它元素则非常快的编组。

Consuming JSON Streaming uploads

消费上传的 JSON 数据流

Sometimes a client sends a streaming request. For example, an embedded device initiated a connection with the server and is feeding it with one line of measurement data.

有时客户端发送流式请求。例如:嵌入式设备初始化发起与服务器的连接,并向服务器提供一行测量数据。

In this example, we want to consume this data in a streaming fashion from the request entity and also apply back pressure to the underlying TCP connection should the server be unable to cope with the rate of incoming data. Back pressure is automatically applied thanks to Akka Streams.

这个例子中,我们希望以流式的方式消费请求实体的数据,并在服务器无法处理传入数据的速率时应用回压到底层的 TCP 连接。 回压是自动应用的,感谢 Akka Streams

Scala
case class Measurement(id: String, value: Int)

object MyMeasurementJsonProtocol
  extends akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
  with spray.json.DefaultJsonProtocol {

  implicit val measurementFormat = jsonFormat2(Measurement.apply)
}
Java
private static final class Measurement {
  private String id;
  private int value;

  public Measurement(String id, int value) {
    this.id = id;
    this.value = value;
  }

  public String getId() {
    return id;
  }

  public void setId(String id) {
    this.id = id;
  }

  public void setValue(int value) {
    this.value = value;
  }

  public int getValue() {
    return value;
  }
}

final Unmarshaller<ByteString, Measurement> Measurements = Jackson.byteStringUnmarshaller(Measurement.class);
Scala
// [1] import "my protocol", for unmarshalling Measurement objects:
import MyMeasurementJsonProtocol._

// [2] enable Json Streaming
implicit val jsonStreamingSupport = EntityStreamingSupport.json()

// prepare your persisting logic here
val persistMetrics = Flow[Measurement]

val route =
  path("metrics") {
    // [3] extract Source[Measurement, _]
    entity(asSourceOf[Measurement]) { measurements =>
      // alternative syntax:
      // entity(as[Source[Measurement, NotUsed]]) { measurements =>
      val measurementsSubmitted: Future[Int] =
        measurements
          .via(persistMetrics)
          .runFold(0) { (cnt, _) => cnt + 1 }

      complete {
        measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n"""))
      }
    }
  }

// tests ------------------------------------------------------------
// uploading an array or newline separated values works out of the box
val data = HttpEntity(
  ContentTypes.`application/json`,
  """
    |{"id":"temp","value":32}
    |{"id":"temp","value":31}
    |
  """.stripMargin)

Post("/metrics", entity = data) ~> route ~> check {
  status should ===(StatusCodes.OK)
  responseAs[String] should ===("""{"msg":"Total metrics received: 2"}""")
}

// the FramingWithContentType will reject any content type that it does not understand:
val xmlData = HttpEntity(
  ContentTypes.`text/xml(UTF-8)`,
  """|<data id="temp" value="32"/>
     |<data id="temp" value="31"/>""".stripMargin)

Post("/metrics", entity = xmlData) ~> route ~> check {
  handled should ===(false)
  rejection should ===(
    UnsupportedRequestContentTypeRejection(
      Set(ContentTypes.`application/json`),
      Some(ContentTypes.`text/xml(UTF-8)`)))
}
Java
import static akka.http.javadsl.server.Directives.complete;
import static akka.http.javadsl.server.Directives.entityAsSourceOf;
import static akka.http.javadsl.server.Directives.extractMaterializer;
import static akka.http.javadsl.server.Directives.onComplete;
import static akka.http.javadsl.server.Directives.post;

final Route incomingStreaming = path("metrics", () ->
  post(() ->
    extractMaterializer(mat -> {
        final JsonEntityStreamingSupport jsonSupport = EntityStreamingSupport.json();

        return entityAsSourceOf(Measurements, jsonSupport, sourceOfMeasurements -> {
          final CompletionStage<Integer> measurementCount = sourceOfMeasurements.runFold(0, (acc, measurement) -> acc + 1, mat);
          return onComplete(measurementCount, c -> complete("Total number of measurements: " + c));
        });
      }
    )
  )
);

Simple CSV streaming example

简单 CSV 数据流示例

Akka HTTP provides another EntityStreamingSupportEntityStreamingSupport out of the box, namely csv (comma-separated values). For completeness, we demonstrate its usage in the snippet below. As you’ll notice, switching between streaming modes is fairly simple: You only have to make sure that an implicit MarshallerMarshaller of the requested type is available and that the streaming support operates on the same Content-Type as the rendered values. Otherwise, you’ll see an error during runtime that the marshaller did not expose the expected content type and thus we can’t render the streaming response).

Akka HTTP 提供了另一个开箱即用的 EntityStreamingSupportEntityStreamingSupport ,既 csv(逗号-分隔的值)。为了完整起见, 我们在下面代码片段里演示它的使用方法。正如你注意到的,切换流模式相当简单:你只需要确保请求类型的隐式 MarshallerMarshaller 可用, 并且流支持操作与渲染值相同的 Content-Type。否则,你将在运行期看到一个错误:编组未暴露预期的内容类型,因此我们不能渲染流式响应。

Scala
// [1] provide a marshaller to ByteString
implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t =>
  Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
    val txt = t.txt.replaceAll(",", ".")
    val uid = t.uid
    ByteString(List(uid, txt).mkString(","))
  })
}

// [2] enable csv streaming:
implicit val csvStreaming = EntityStreamingSupport.csv()

val route =
  path("tweets") {
    val tweets: Source[Tweet, NotUsed] = getTweets
    complete(tweets)
  }

// tests ------------------------------------------------------------
val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`))

Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check {
  responseAs[String] shouldEqual
    "1,#Akka rocks!" + "\n" +
    "2,Streaming is so hot right now!" + "\n" +
    "3,You cannot enter the same river twice." + "\n"
}
Java
import static akka.http.javadsl.server.Directives.get;
import static akka.http.javadsl.server.Directives.path;
import static akka.http.javadsl.server.Directives.completeWithSource;

final Marshaller<JavaTweet, ByteString> renderAsCsv =
  Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t ->
    ByteString.fromString(t.getId() + "," + t.getMessage())
  );

final CsvEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.csv();

final Route responseStreaming = path("tweets", () ->
  get(() ->
    parameter(StringUnmarshallers.INTEGER, "n", n -> {
      final Source<JavaTweet, NotUsed> tws =
        Source.repeat(new JavaTweet(12, "Hello World!")).take(n);
      return completeWithSource(tws, renderAsCsv, compactJsonSupport);
    })
  )
);

Implementing custom EntityStreamingSupport traits

实现自定义 EntityStreamingSupport trait

The EntityStreamingSupportEntityStreamingSupport infrastructure is open for extension and not bound to any single format, content type, or marshalling library. The provided JSON support does not rely on spray-json directly, but uses Marshaller<T, ByteString>Marshaller[T, ByteString] instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).

EntityStreamingSupportEntityStreamingSupport 基础设施对扩展是开放的并且不绑定的任何单一格式:内容类型或编组库。 提供 JSON 支持并不直接依赖 spary-json,而是使用 Marshaller<T, ByteString>Marshaller[T, ByteString] 实例,可以使用任何 JSON 编组库(如:Circe、Jawn 或 Play JSON)提供该实例。

When implementing a custom support trait, one should simply extend the EntityStreamingSupportEntityStreamingSupport abstract class and implement all of its methods. It’s best to use the existing implementations as a guideline.

当实现自定义支持 trait 时,只用扩展 EntityStreamingSupportEntityStreamingSupport 抽象类并实现所有方法。最好使用现有实现作为指导原则。

Supporting custom content types

支持自定义内容类型

In order to marshal into custom content types, both a MarshallerMarshaller that can handle that content type as well as an EntityStreamingSupportEntityStreamingSupport of matching content type is required.

为了编组到自定义内容类型,既需要一个 MarshallerMarshaller 可以处理该内容类型, 也需要一个匹配内容类型的 EntityStreamingSupportEntityStreamingSupport

Refer to the complete example below, showcasing how to configure a custom marshaller and change the entity streaming support’s content type to be compatible. This is an area that would benefit from additional type safety, which we hope to add in a future release.

参考下面的完整示例,展示如何配置一个自定义编组并将实体流更改为兼容支持的内容类型。这是一个可以从额外的类型安全上获益的领域,我们希望在未来的发布中添加它。

Scala
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport }
import akka.http.scaladsl.model.{ HttpEntity, _ }
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling.{ Marshaller, ToEntityMarshaller }
import akka.stream.scaladsl.Source
import spray.json.DefaultJsonProtocol

import scala.io.StdIn
import scala.util.Random

final case class User(name: String, id: String)

trait UserProtocol extends DefaultJsonProtocol {

  import spray.json._

  implicit val userFormat = jsonFormat2(User)

  val `vnd.example.api.v1+json` =
    MediaType.applicationWithFixedCharset("vnd.example.api.v1+json", HttpCharsets.`UTF-8`)
  val ct = ContentType.apply(`vnd.example.api.v1+json`)

  implicit def userMarshaller: ToEntityMarshaller[User] = Marshaller.oneOf(
    Marshaller.withFixedContentType(`vnd.example.api.v1+json`) { organisation =>
      HttpEntity(`vnd.example.api.v1+json`, organisation.toJson.compactPrint)
    })
}

object ApiServer extends App with UserProtocol {
  implicit val system = ActorSystem("api")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
    .withContentType(ct)
    .withParallelMarshalling(parallelism = 10, unordered = false)

  // (fake) async database query api
  def dummyUser(id: String) = User(s"User $id", id.toString)

  def fetchUsers(): Source[User, NotUsed] = Source.fromIterator(() => Iterator.fill(10000) {
    val id = Random.nextInt()
    dummyUser(id.toString)
  })

  val route =
    pathPrefix("users") {
      get {
        complete(fetchUsers())
      }
    }

  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine()
  bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
Java
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.marshalling.Marshaller;
import akka.http.javadsl.model.*;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;

import java.util.Random;
import java.util.stream.Stream;

public class JsonStreamingFullExample extends AllDirectives {

    public Route createRoute() {
        final MediaType.WithFixedCharset mediaType =
                MediaTypes.applicationWithFixedCharset("vnd.example.api.v1+json", HttpCharsets.UTF_8);

        final ContentType.WithFixedCharset contentType = ContentTypes.create(mediaType);

        final Marshaller<User, RequestEntity> userMarshaller =
                Marshaller.withFixedContentType(contentType, (User user) -> HttpEntities.create(contentType, user.toJson()));

        final EntityStreamingSupport jsonStreamingSupport = EntityStreamingSupport.json()
                .withContentType(contentType)
                .withParallelMarshalling(10, false);

        return get(() ->
                pathPrefix("users", () ->
                        completeOKWithSource(fetchUsers(), userMarshaller, jsonStreamingSupport)
                )
        );
    }

    private Source<User, NotUsed> fetchUsers() {
        final Random rnd = new Random();
        return Source.fromIterator(() -> Stream.generate(rnd::nextInt).map(this::dummyUser).limit(10000).iterator());
    }

    private User dummyUser(int id) {
        return new User(id, "User " + id);
    }

    static final class User {
        int id;
        String name;

        User(int id, String name) {
            this.id = id;
            this.name = name;
        }

        String toJson() {
            return "{\"id\":\"" + id + "\", \"name\":\"" + name + "\"}";
        }
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create();
        final JsonStreamingFullExample app = new JsonStreamingFullExample();
        final Http http = Http.get(system);
        final ActorMaterializer materializer = ActorMaterializer.create(system);

        final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
        http.bindAndHandle(routeFlow, ConnectHttp.toHost("localhost", 8080), materializer);
    }
}

Consuming streaming JSON on client-side

在客户端消费流式 JSON

For consuming such streaming APIs with, for example, JSON responses refer to Consuming JSON Streaming style APIs documentation in the JSON support section.

例如,消费此类流式 API 的 JSON 响应,参考在 JSON 支持部分的 消费 JSON 流式风格的 APIs 文档。

在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交 Pull Request。