服务器-发送事件支持

Server-Sent Events Support

Server-Sent Events (SSE) is a lightweight and standardized protocol for pushing notifications from a HTTP server to a client. In contrast to WebSocket, which offers bi-directional communication, SSE only allows for one-way communication from the server to the client. If that’s all you need, SSE has the advantages to be much simpler, to rely on HTTP only and to offer retry semantics on broken connections by the browser.

服务器-发送事件(SSE)是一个轻量级、标准化 协议,用于从 HTTP 服务器推送通知到客户端。 与 WebSocket 相反,它提供双向通信,而 SSE 只允许从服务器到客户端一种通信方式。如果这就是你需要的,SSE 的优势在于更简单,它只依赖 HTTP, 并且在浏览器断开的连接上提供重试语义。

According to the SSE specification clients can request an event stream from the server via HTTP. The server responds with the media type text/event-stream which has the fixed character encoding UTF-8 and keeps the response open to send events to the client when available. Events are textual structures which carry fields and are terminated by an empty line, e.g.

根据 SSE 规范,客户端可以通过 HTTP 从服务器请求一个事件流。 服务器用具有固定字符编码 UTF-8 的媒体类型 text/event-stream 应答,并且保持响应打开,在可用时发送事件到客户端。 事件是文本结构,携带字段并以以空行结尾。例如:

data: { "username": "John Doe" }
event: added
id: 42

data: another event

Clients can optionally signal the last seen event to the server via the Last-Event-IDLastEventId header, e.g. after a reconnect.

客户端可以选择通过 Last-Event-IDLastEventId 头域最后看到的事件发送到服务器,例如:重新连接后。

译注:客户端可以保存 Last-Event-IDLastEventId 的值,在重连后以此恢复正常处理。

Model

模型

Akka HTTP represents event streams as Source<ServerSentEvent, ?>Source[ServerSentEvent, _] where ServerSentEventServerSentEvent is a case class with the following read-only properties:

Akka HTTP 维护事件流为 Source<ServerSentEvent, ?>Source[ServerSentEvent, _],其中 ServerSentEventServerSentEvent 是一个 case 类,具有下面的只读属性:

  • data: StringString data – the actual payload, may span multiple lines
    • 真实的负荷,可能跨多行
  • eventType: Option[String]Optional<String> type – optional qualifier, e.g. “added”, “removed”, etc.
    • 可选的修饰,例如:addedremoved
  • id: Option[String]Optional<String> id – optional identifier
    • 可选的标识符
  • retry: Option[Int]OptionalInt retry – optional reconnection delay in milliseconds
    • 可选的重新连接延迟(毫秒)

In accordance to the SSE specification Akka HTTP also provides the Last-Event-IDLastEventId header and the text/event-streamTEXT_EVENT_STREAM media type.

Server-side usage: marshalling

服务器端使用方法:编组

In order to respond to a HTTP request with an event stream, you have to bring the implicit ToResponseMarshaller[Source[ServerSentEvent, \_]] defined by EventStreamMarshallingEventStreamMarshalling into the scope defining the respective routeuse the EventStreamMarshalling.toEventStream marshaller:

为了使用事件流应答一个 HTTP 请求,你需要 提供由 EventStreamMarshallingEventStreamMarshalling 定义的隐式转换 ToResponseMarshaller[Source[ServerSentEvent, \_]] 到各自路由的作用域范围使用 EventStreamMarshalling.toEventStream 编组:

Scala
import akka.NotUsed
import akka.stream.scaladsl.Source

import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._

import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME

def route: Route = {
  import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

  path("events") {
    get {
      complete {
        Source
          .tick(2.seconds, 2.seconds, NotUsed)
          .map(_ => LocalTime.now())
          .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }
}
Java
final List<ServerSentEvent> events = new ArrayList<>();
events.add(ServerSentEvent.create("1"));
events.add(ServerSentEvent.create("2"));
final Route route = completeOK(Source.from(events), EventStreamMarshalling.toEventStream());

Client-side usage: unmarshalling

客户端使用方法:解组

In order to unmarshal an event stream as Source<ServerSentEvent, ?>Source[ServerSentEvent, _], you have to bring the implicit FromEntityUnmarshaller[Source[ServerSentEvent, _]] defined by EventStreamUnmarshallingEventStreamUnmarshalling into scopeuse the EventStreamUnmarshalling.fromEventsStream unmarshaller:

为了解组一个事件流为 Source<ServerSentEvent, ?>Source[ServerSentEvent, _] ,你需要 提供由 EventStreamUnmarshallingEventStreamUnmarshalling 定义的隐式转换 FromEntityUnmarshaller[Source[ServerSentEvent, _]] 到作用域范围使用 EventStreamUnmarshalling.fromEventsStream 解组:

Scala
import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._

Http()
  .singleRequest(Get("http://localhost:8000/events"))
  .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
  .foreach(_.runForeach(println))
Java
List<ServerSentEvent> unmarshalledEvents =
        EventStreamUnmarshalling.fromEventsStream(system)
                .unmarshal(entity, system.dispatcher(), mat)
                .thenCompose(source -> source.runWith(Sink.seq(), mat))
                .toCompletableFuture()
                .get(3000, TimeUnit.SECONDS);

Notice that if you are looking for a resilient way to permanently subscribe to an event stream, Alpakka provides the EventSource connector which reconnects automatically with the id of the last seen event.

注意,如果你正在寻找永久订阅事件流的弹性方法,那么 Alpakka 提供的 EventSource 连接器可以使用最后看到的事件 id 自动重连。

在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交 Pull Request。