服务器-发送事件支持
Server-Sent Events Support
Server-Sent Events (SSE) is a lightweight and standardized protocol for pushing notifications from a HTTP server to a client. In contrast to WebSocket, which offers bi-directional communication, SSE only allows for one-way communication from the server to the client. If that’s all you need, SSE has the advantages to be much simpler, to rely on HTTP only and to offer retry semantics on broken connections by the browser.
服务器-发送事件(SSE)是一个轻量级、标准化 协议,用于从 HTTP 服务器推送通知到客户端。 与 WebSocket 相反,它提供双向通信,而 SSE 只允许从服务器到客户端一种通信方式。如果这就是你需要的,SSE 的优势在于更简单,它只依赖 HTTP, 并且在浏览器断开的连接上提供重试语义。
According to the SSE specification clients can request an event stream from the server via HTTP. The server responds with the media type text/event-stream
which has the fixed character encoding UTF-8 and keeps the response open to send events to the client when available. Events are textual structures which carry fields and are terminated by an empty line, e.g.
根据 SSE 规范,客户端可以通过 HTTP 从服务器请求一个事件流。 服务器用具有固定字符编码 UTF-8 的媒体类型 text/event-stream
应答,并且保持响应打开,在可用时发送事件到客户端。 事件是文本结构,携带字段并以以空行结尾。例如:
data: { "username": "John Doe" }
event: added
id: 42
data: another event
Clients can optionally signal the last seen event to the server via the Last-Event-ID
LastEventId
header, e.g. after a reconnect.
客户端可以选择通过 Last-Event-ID
LastEventId
头域最后看到的事件发送到服务器,例如:重新连接后。
译注:客户端可以保存 Last-Event-ID
LastEventId
的值,在重连后以此恢复正常处理。
Model
模型
Akka HTTP represents event streams as Source<ServerSentEvent, ?>
Source[ServerSentEvent, _]
where ServerSentEvent
ServerSentEvent
is a case class with the following read-only properties:
Akka HTTP 维护事件流为 Source<ServerSentEvent, ?>
Source[ServerSentEvent, _]
,其中 ServerSentEvent
ServerSentEvent
是一个 case 类,具有下面的只读属性:
data: String
String data
– the actual payload, may span multiple lines- 真实的负荷,可能跨多行
eventType: Option[String]
Optional<String> type
– optional qualifier, e.g. “added”, “removed”, etc.- 可选的修饰,例如:
added
、removed
等
- 可选的修饰,例如:
id: Option[String]
Optional<String> id
– optional identifier- 可选的标识符
retry: Option[Int]
OptionalInt retry
– optional reconnection delay in milliseconds- 可选的重新连接延迟(毫秒)
In accordance to the SSE specification Akka HTTP also provides the Last-Event-ID
LastEventId
header and the text/event-stream
TEXT_EVENT_STREAM
media type.
Server-side usage: marshalling
服务器端使用方法:编组
In order to respond to a HTTP request with an event stream, you have to bring the implicit ToResponseMarshaller[Source[ServerSentEvent, \_]]
defined by EventStreamMarshalling
EventStreamMarshalling
into the scope defining the respective routeuse the EventStreamMarshalling.toEventStream
marshaller:
为了使用事件流应答一个 HTTP 请求,你需要 提供由 EventStreamMarshalling
EventStreamMarshalling
定义的隐式转换 ToResponseMarshaller[Source[ServerSentEvent, \_]]
到各自路由的作用域范围使用 EventStreamMarshalling.toEventStream
编组:
- Scala
-
import akka.NotUsed import akka.stream.scaladsl.Source import akka.http.scaladsl.Http import akka.http.scaladsl.unmarshalling.Unmarshal import akka.http.scaladsl.model.sse.ServerSentEvent import scala.concurrent.duration._ import java.time.LocalTime import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME def route: Route = { import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ path("events") { get { complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map(_ => LocalTime.now()) .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time))) .keepAlive(1.second, () => ServerSentEvent.heartbeat) } } } }
- Java
-
final List<ServerSentEvent> events = new ArrayList<>(); events.add(ServerSentEvent.create("1")); events.add(ServerSentEvent.create("2")); final Route route = completeOK(Source.from(events), EventStreamMarshalling.toEventStream());
Client-side usage: unmarshalling
客户端使用方法:解组
In order to unmarshal an event stream as Source<ServerSentEvent, ?>
Source[ServerSentEvent, _]
, you have to bring the implicit FromEntityUnmarshaller[Source[ServerSentEvent, _]]
defined by EventStreamUnmarshalling
EventStreamUnmarshalling
into scopeuse the EventStreamUnmarshalling.fromEventsStream
unmarshaller:
为了解组一个事件流为 Source<ServerSentEvent, ?>
Source[ServerSentEvent, _]
,你需要 提供由 EventStreamUnmarshalling
EventStreamUnmarshalling
定义的隐式转换 FromEntityUnmarshaller[Source[ServerSentEvent, _]]
到作用域范围使用 EventStreamUnmarshalling.fromEventsStream
解组:
- Scala
-
import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ Http() .singleRequest(Get("http://localhost:8000/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(println))
- Java
-
List<ServerSentEvent> unmarshalledEvents = EventStreamUnmarshalling.fromEventsStream(system) .unmarshal(entity, system.dispatcher(), mat) .thenCompose(source -> source.runWith(Sink.seq(), mat)) .toCompletableFuture() .get(3000, TimeUnit.SECONDS);
Notice that if you are looking for a resilient way to permanently subscribe to an event stream, Alpakka provides the EventSource connector which reconnects automatically with the id of the last seen event.
注意,如果你正在寻找永久订阅事件流的弹性方法,那么 Alpakka 提供的 EventSource 连接器可以使用最后看到的事件 id 自动重连。