路由 DSL
Routing DSL
In addition to the Core Server API Akka HTTP provides a very flexible “Routing DSL” for elegantly defining RESTful web services. It picks up where the low-level API leaves off and offers much of the higher-level functionality of typical web servers or frameworks, like deconstruction of URIs, content negotiation or static content serving.
在 核心服务器 API 之外,Akka HTTP 提供了一个很灵活的 “路由 DSL” 来优雅的定义 RESTful Web 服务。 它改善低级 API 的不足,同时提供典型 Web 服务器或框架的很多高级功能,比如:解构 URI、内容协商或者静态内容服务。
It is recommended to read the Implications of the streaming nature of Request/Response Entities section, as it explains the underlying full-stack streaming concepts, which may be unexpected when coming from a background with non-“streaming first” HTTP Servers.
推荐阅读 请求/响应实体流的实质含义 部分,它阐明了(Akka HTTP)底层的全栈流的概念。 因为对于没有“流式优先” HTTP 服务器概念背景的人来说,也许会感到难以理解。
Minimal Example
小型示例
This is a complete, very basic Akka HTTP application relying on the Routing DSL:
这是一个完整的、最基本的依靠路由 DSL 的 Akka HTTP 应用程序:
- Scala
-
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import scala.io.StdIn object WebServer { def main(args: Array[String]) { implicit val system = ActorSystem("my-system") implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val route = path("hello") { get { complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>")) } } val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") StdIn.readLine() // let it run until user presses return bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ => system.terminate()) // and shutdown when done } }
- Java
-
import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.server.AllDirectives; import akka.http.javadsl.server.Route; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Flow; import java.util.concurrent.CompletionStage; public class HttpServerMinimalExampleTest extends AllDirectives { public static void main(String[] args) throws Exception { // boot up server using the route as defined below ActorSystem system = ActorSystem.create("routes"); final Http http = Http.get(system); final ActorMaterializer materializer = ActorMaterializer.create(system); //In order to access all directives we need an instance where the routes are define. HttpServerMinimalExampleTest app = new HttpServerMinimalExampleTest(); final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer); final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost("localhost", 8080), materializer); System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop..."); System.in.read(); // let it run until user presses return binding .thenCompose(ServerBinding::unbind) // trigger unbinding from the port .thenAccept(unbound -> system.terminate()); // and shutdown when done } private Route createRoute() { return concat( path("hello", () -> get(() -> complete("<h1>Say hello to akka-http</h1>")))); } }
It starts an HTTP Server on localhost and replies to GET requests to /hello
with a simple response.
它在本地启动一个 HTTP 服务器,并使用简单响应回复到 /hello
的 GET 请求。
The following example uses an experimental feature and its API is subjected to change in future releases of Akka HTTP. For further information about this marker, see The @DoNotInherit and @ApiMayChange markers in the Akka documentation.
下面示例使用实验特性,它的 API 在未来 Akka HTTP 发布时随时变化。 有关此标记的更多信息,见 Akka 文档里的 @DoNotInherit 和 @ApiMayChange 标记 。
To help start a server Akka HTTP provides an experimental helper class called HttpApp
HttpApp
. This is the same example as before rewritten using HttpApp
HttpApp
:
为了帮助启动一个服务,Akka HTTP 提供一个实验性辅助类 HttpApp
HttpApp
。 这是使用 HttpApp
HttpApp
重写之前的相同示例:
- Scala
-
import akka.http.scaladsl.model.{ ContentTypes, HttpEntity } import akka.http.scaladsl.server.HttpApp import akka.http.scaladsl.server.Route // Server definition object WebServer extends HttpApp { override def routes: Route = path("hello") { get { complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>")) } } } // Starting the server WebServer.startServer("localhost", 8080)
- Java
-
// Server definition class MinimalHttpApp extends HttpApp { @Override protected Route routes() { return path("hello", () -> get(() -> complete("<h1>Say hello to akka-http</h1>") ) ); } } // Starting the server final MinimalHttpApp myServer = new MinimalHttpApp(); myServer.startServer("localhost", 8080);
See HttpApp Bootstrap for more details about setting up a server using this approach.
有关使用这个方式设置服务器的更多详细信息,见 HttpApp 引导程序 。
Longer Example
较长的示例
The following is an Akka HTTP route definition that tries to show off a few features. The resulting service does not really do anything useful but its definition should give you a feel for what an actual API definition with the Routing DSL will look like:
下面是一个 Akka HTTP 路由定义,试图展示一些功能。服务实际上没有做任何有用的事,但是,它的定义应该让你了解了路由 DSL 的实际 API 定义看起来像:
import akka.actor.{ ActorRef, ActorSystem }
import akka.http.scaladsl.coding.Deflate
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.model.StatusCodes.MovedPermanently
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
// types used by the API routes
type Money = Double // only for demo purposes, don't try this at home!
type TransactionResult = String
case class User(name: String)
case class Order(email: String, amount: Money)
case class Update(order: Order)
case class OrderItem(i: Int, os: Option[String], s: String)
// marshalling would usually be derived automatically using libraries
implicit val orderUM: FromRequestUnmarshaller[Order] = ???
implicit val orderM: ToResponseMarshaller[Order] = ???
implicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ???
implicit val timeout: Timeout = ??? // for actor asks
implicit val ec: ExecutionContext = ???
implicit val mat: ActorMaterializer = ???
implicit val sys: ActorSystem = ???
// backend entry points
def myAuthenticator: Authenticator[User] = ???
def retrieveOrdersFromDB: Seq[Order] = ???
def myDbActor: ActorRef = ???
def processOrderRequest(id: Int, complete: Order => Unit): Unit = ???
val route = concat(
path("orders") {
authenticateBasic(realm = "admin area", myAuthenticator) { user =>
concat(
get {
encodeResponseWith(Deflate) {
complete {
// marshal custom object with in-scope marshaller
retrieveOrdersFromDB
}
}
},
post {
// decompress gzipped or deflated requests if required
decodeRequest {
// unmarshal with in-scope unmarshaller
entity(as[Order]) { order =>
complete {
// ... write order to DB
"Order received"
}
}
}
})
}
},
// extract URI path element as Int
pathPrefix("order" / IntNumber) { orderId =>
concat(
pathEnd {
concat(
(put | parameter('method ! "put")) {
// form extraction from multipart or www-url-encoded forms
formFields(('email, 'total.as[Money])).as(Order) { order =>
complete {
// complete with serialized Future result
(myDbActor ? Update(order)).mapTo[TransactionResult]
}
}
},
get {
// debugging helper
logRequest("GET-ORDER") {
// use in-scope marshaller to create completer function
completeWith(instanceOf[Order]) { completer =>
// custom
processOrderRequest(orderId, completer)
}
}
})
},
path("items") {
get {
// parameters to case class extraction
parameters(('size.as[Int], 'color ?, 'dangerous ? "no"))
.as(OrderItem) { orderItem =>
// ... route using case class instance created from
// required and optional query parameters
}
}
})
},
pathPrefix("documentation") {
// optionally compresses the response with Gzip or Deflate
// if the client accepts compressed responses
encodeResponse {
// serve up static content from a JAR resource
getFromResourceDirectory("docs")
}
},
path("oldApi" / Remaining) { pathRest =>
redirect("http://oldapi.example.com/" + pathRest, MovedPermanently)
}
)
Interaction with Akka Typed
与 Akka Typed 的交互
Since Akka version 2.5.22
, Akka typed became ready for production, Akka HTTP, however, is still using the untyped ActorSystem
. This following example will demonstrate how to use Akka HTTP and Akka Typed together within the same application.
从 Akka 2.5.22
版本开始,Akka typed 可用于生产,但是,Akka HTTP 将继续使用无类型的 ActorSystem
。 下面的例子将演示在同一个程序里怎样一起使用 Akka HTTP 和 Akka Typed。
We will create a small web server responsible to record build jobs with its state and duration, query jobs by id and status, and clear the job history.
我们将创建一个小型 Web 服务器,服务器负责记录使用状态和持续时间的构建作业、通过 id 和状态查询作业、和清理作业历史。
First let’s start by defining the Behavior
that will act as a repository for the build job information, this isn’t strictly needed for our sample but just to have an actual actor to interact with:
首先让我们开始于通过定义的 Behavior
,它将作为构建作业信息的存储库,对于我们的示例这不是必须的,仅仅是为了有一个实际的 actor 与之交互:
- Scala
-
import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.scaladsl.Behaviors object JobRepository { // Definition of the a build job and its possible status values sealed trait Status object Successful extends Status object Failed extends Status final case class Job(id: Long, projectName: String, status: Status, duration: Long) final case class Jobs(jobs: Seq[Job]) // Trait defining successful and failure responses sealed trait Response case object OK extends Response final case class KO(reason: String) extends Response // Trait and its implementations representing all possible messages that can be sent to this Behavior sealed trait Command final case class AddJob(job: Job, replyTo: ActorRef[Response]) extends Command final case class GetJobById(id: Long, replyTo: ActorRef[Option[Job]]) extends Command final case class GetJobByStatus(status: Status, replyTo: ActorRef[Seq[Job]]) extends Command final case class ClearJobs(replyTo: ActorRef[Response]) extends Command // This behavior handles all possible incoming messages and keeps the state in the function parameter def apply(jobs: Map[Long, Job] = Map.empty): Behavior[Command] = Behaviors.receiveMessage { case AddJob(job, replyTo) if jobs.contains(job.id) => replyTo ! KO("Job already exists") Behaviors.same case AddJob(job, replyTo) => replyTo ! OK JobRepository(jobs.+(job.id -> job)) case GetJobById(id, replyTo) => replyTo ! jobs.get(id) Behaviors.same case ClearJobs(replyTo) => replyTo ! OK JobRepository(Map.empty) } }
Then, let’s define the JSON marshaller and unmarshallers for the HTTP routes:
然后,让我们定义 HTTP 路由的 JSON 编组和解组:
- Scala
-
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import spray.json.DefaultJsonProtocol import spray.json.DeserializationException import spray.json.JsString import spray.json.JsValue import spray.json.RootJsonFormat trait JsonSupport extends SprayJsonSupport { // import the default encoders for primitive types (Int, String, Lists etc) import DefaultJsonProtocol._ import JobRepository._ implicit object StatusFormat extends RootJsonFormat[Status] { def write(status: Status): JsValue = status match { case Failed => JsString("Failed") case Successful => JsString("Successful") } def read(json: JsValue): Status = json match { case JsString("Failed") => Failed case JsString("Successful") => Successful case _ => throw new DeserializationException("Status unexpected") } } implicit val jobFormat = jsonFormat4(Job) implicit val jobsFormat = jsonFormat1(Jobs) }
Next step is to define the Route
Route
that will communicate with the previously defined behavior and handle all its possible responses:
下一步是定义 Route
Route
,它将与之前定义的行为通信并处理所有可能的响应:
- Scala
-
import akka.actor.typed.ActorSystem import akka.util.Timeout import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.server.Route import scala.concurrent.duration._ import scala.concurrent.Future class JobRoutes(buildJobRepository: ActorRef[JobRepository.Command])(implicit system: ActorSystem[_]) extends JsonSupport { import akka.actor.typed.scaladsl.AskPattern._ // asking someone requires a timeout and a scheduler, if the timeout hits without response // the ask is failed with a TimeoutException implicit val timeout: Timeout = 3.seconds // implicit scheduler only needed in 2.5 // in 2.6 having an implicit typed ActorSystem in scope is enough implicit val scheduler = system.scheduler lazy val theJobRoutes: Route = pathPrefix("jobs") { concat( pathEnd { concat( post { entity(as[JobRepository.Job]) { job => val operationPerformed: Future[JobRepository.Response] = buildJobRepository.ask(JobRepository.AddJob(job, _)) onSuccess(operationPerformed) { case JobRepository.OK => complete("Job added") case JobRepository.KO(reason) => complete(StatusCodes.InternalServerError -> reason) } } }, delete { val operationPerformed: Future[JobRepository.Response] = buildJobRepository.ask(JobRepository.ClearJobs(_)) onSuccess(operationPerformed) { case JobRepository.OK => complete("Jobs cleared") case JobRepository.KO(reason) => complete(StatusCodes.InternalServerError -> reason) } } ) }, (get & path(LongNumber)) { id => val maybeJob: Future[Option[JobRepository.Job]] = buildJobRepository.ask(JobRepository.GetJobById(id, _)) rejectEmptyResponse { complete(maybeJob) } } ) } }
Finally, we create a Behavior
that bootstraps the web server and use it as the root behavior of our actor system:
最后,我们创建一个 Behavior
来引导 Web 服务器,并使用它作为我们 actor 系统的根行为:
- Scala
-
import akka.actor.typed.PostStop import akka.actor.typed.scaladsl.adapter._ import akka.stream.ActorMaterializer import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.Http import scala.concurrent.ExecutionContextExecutor import scala.util.{ Success, Failure } object Server { sealed trait Message private final case class StartFailed(cause: Throwable) extends Message private final case class Started(binding: ServerBinding) extends Message case object Stop extends Message def apply(host: String, port: Int): Behavior[Message] = Behaviors.setup { ctx => implicit val system = ctx.system // http doesn't know about akka typed so provide untyped system implicit val untypedSystem: akka.actor.ActorSystem = ctx.system.toClassic // implicit materializer only required in Akka 2.5 // in 2.6 having an implicit classic or typed ActorSystem in scope is enough implicit val materializer: ActorMaterializer = ActorMaterializer()(ctx.system.toClassic) implicit val ec: ExecutionContextExecutor = ctx.system.executionContext val buildJobRepository = ctx.spawn(JobRepository(), "JobRepository") val routes = new JobRoutes(buildJobRepository) val serverBinding: Future[Http.ServerBinding] = Http.apply().bindAndHandle(routes.theJobRoutes, host, port) ctx.pipeToSelf(serverBinding) { case Success(binding) => Started(binding) case Failure(ex) => StartFailed(ex) } def running(binding: ServerBinding): Behavior[Message] = Behaviors.receiveMessagePartial[Message] { case Stop => ctx.log.info( "Stopping server http://{}:{}/", binding.localAddress.getHostString, binding.localAddress.getPort) Behaviors.stopped }.receiveSignal { case (_, PostStop) => binding.unbind() Behaviors.same } def starting(wasStopped: Boolean): Behaviors.Receive[Message] = Behaviors.receiveMessage[Message] { case StartFailed(cause) => throw new RuntimeException("Server failed to start", cause) case Started(binding) => ctx.log.info( "Server online at http://{}:{}/", binding.localAddress.getHostString, binding.localAddress.getPort) if (wasStopped) ctx.self ! Stop running(binding) case Stop => // we got a stop message but haven't completed starting yet, // we cannot stop until starting has completed starting(wasStopped = true) } starting(wasStopped = false) } } def main(args: Array[String]) { val system: ActorSystem[Server.Message] = ActorSystem(Server("localhost", 8080), "BuildJobsServer") }
Note that the akka.actor.typed.ActorSystem
is converted with toClassic
, which comes from import akka.actor.typed.scaladsl.adapter._
. If you are using an earlier version than Akka 2.5.26 this conversion method is named toUntyped
.
注意,akka.actor.typed.ActorSystem
是用 toClassic
转换,它来自 import akka.actor.typed.scaladsl.adapter._
。 如果你使用比 Akka 2.5.26 更早的版本,则这个转换方法称为 toUntyped
。
Dynamic Routing Example
动态路由示例
As the routes are evaluated for each request, it is possible to make changes at runtime. Please note that every access may happen on a separated thread, so any shared mutable state must be thread safe.
在为每个请求评估路由时,可以在运行时进行改变。请注意,因为每次访问都可能发生在单独的线程,所以任何共享可变状态都必须是线程安全的。
The following is an Akka HTTP route definition that allows dynamically adding new or updating mock endpoints with associated request-response pairs at runtime.
下面是一个 Akka HTTP 路由定义,它允许在运行时动态添加带有关联请求-响应对的新的或更新模拟端点。
- Scala
-
case class MockDefinition(path: String, requests: Seq[JsValue], responses: Seq[JsValue]) implicit val format = jsonFormat3(MockDefinition) @volatile var state = Map.empty[String, Map[JsValue, JsValue]] // fixed route to update state val fixedRoute: Route = post { pathSingleSlash { entity(as[MockDefinition]) { mock => val mapping = mock.requests.zip(mock.responses).toMap state = state + (mock.path -> mapping) complete("ok") } } } // dynamic routing based on current state val dynamicRoute: Route = ctx => { val routes = state.map { case (segment, responses) => post { path(segment) { entity(as[JsValue]) { input => complete(responses.get(input)) } } } } concat(routes.toList: _*)(ctx) } val route = fixedRoute ~ dynamicRoute
- Java
-
final private Map<String, Map<JsonNode, JsonNode>> state = new ConcurrentHashMap<>(); private Route createRoute() { // fixed route to update state Route fixedRoute = post(() -> pathSingleSlash(() -> entity(Jackson.unmarshaller(MockDefinition.class), mock -> { Map<JsonNode, JsonNode> mappings = new HashMap<>(); int size = Math.min(mock.getRequests().size(), mock.getResponses().size()); for (int i = 0; i < size; i++) { mappings.put(mock.getRequests().get(i), mock.getResponses().get(i)); } state.put(mock.getPath(), mappings); return complete("ok"); }) ) ); // dynamic routing based on current state Route dynamicRoute = post(() -> state.entrySet().stream().map(mock -> path(mock.getKey(), () -> entity(Jackson.unmarshaller(JsonNode.class), input -> complete(StatusCodes.OK, mock.getValue().get(input), Jackson.marshaller()) ) ) ).reduce(reject(), Route::orElse) ); return concat(fixedRoute, dynamicRoute); } private static class MockDefinition { private final String path; private final List<JsonNode> requests; private final List<JsonNode> responses; public MockDefinition(@JsonProperty("path") String path, @JsonProperty("requests") List<JsonNode> requests, @JsonProperty("responses") List<JsonNode> responses) { this.path = path; this.requests = requests; this.responses = responses; } public String getPath() { return path; } public List<JsonNode> getRequests() { return requests; } public List<JsonNode> getResponses() { return responses; } }
For example, let’s say we do a POST request with body:
例如,我们对正文执行一个 POST 请求。
{
"path": "test",
"requests": [
{"id": 1},
{"id": 2}
],
"responses": [
{"amount": 1000},
{"amount": 2000}
]
}
Subsequent POST request to /test
with body {"id": 1}
will be responded with {"amount": 1000}
.
对正文 {"id": 1}
到 /test
的后续 POST 请求将以 {"amount": 1000]
响应。
Handling HTTP Server failures in the High-Level API
在高级 API 里处理 HTTP 服务器失败
There are various situations when failure may occur while initialising or running an Akka HTTP server. Akka by default will log all these failures, however sometimes one may want to react to failures in addition to them just being logged, for example by shutting down the actor system, or notifying some external monitoring end-point explicitly.
当初始化和运行 Akka HTTP 服务器时,各种失败都可能发行。Akka 默认将记录所有失败,但是,有时也想在记录日志这外对失败作出反应, 比如关闭 actor 系统,或者显示的通知外部监视器端点。
Bind failures
For example the server might be unable to bind to the given port. For example when the port is already taken by another application, or if the port is privileged (i.e. only usable by root
). In this case the “binding future” will fail immediately, and we can react to it by listening on the Future
CompletionStage
’s completion:
例如服务器可能无法绑定以指定的端口。比如当端口已经用于另一个应用程序,或者端口是特权端口(例如:只能 root
使用)。 这种情况下,“绑定 Future”会立即失败,我们可以通过监听 FutureCompletionStage 的完成作出反应:
- Scala
-
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import scala.concurrent.Future object WebServer { def main(args: Array[String]) { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future foreach in the end implicit val executionContext = system.dispatcher val handler = get { complete("Hello world!") } // let's say the OS won't allow us to bind to 80. val (host, port) = ("localhost", 80) val bindingFuture: Future[ServerBinding] = Http().bindAndHandle(handler, host, port) bindingFuture.failed.foreach { ex => log.error(ex, "Failed to bind to {}:{}!", host, port) } } }
- Java
-
import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.server.Route; import akka.http.javadsl.Http; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Flow; import java.io.IOException; import java.util.concurrent.CompletionStage; public class HighLevelServerBindFailureExample { public static void main(String[] args) throws IOException { // boot up server using the route as defined below final ActorSystem system = ActorSystem.create(); final ActorMaterializer materializer = ActorMaterializer.create(system); final HighLevelServerExample app = new HighLevelServerExample(); final Route route = app.createRoute(); final Flow<HttpRequest, HttpResponse, NotUsed> handler = route.flow(system, materializer); final CompletionStage<ServerBinding> binding = Http.get(system).bindAndHandle(handler, ConnectHttp.toHost("127.0.0.1", 8080), materializer); binding.exceptionally(failure -> { System.err.println("Something very bad happened! " + failure.getMessage()); system.terminate(); return null; }); system.terminate(); } }
For a more low-level overview of the kinds of failures that can happen and also more fine-grained control over them refer to the Handling HTTP Server failures in the Low-Level API documentation.
有关可能发生失败的更低级概述,以及对它们的更精细控制,请参考 在低级 API 里处理 HTTP 服务器错误 文档。
Failures and exceptions inside the Routing DSL
路由 DSL 内部的失败和异常
Exception handling within the Routing DSL is done by providing ExceptionHandler
ExceptionHandler
s which are documented in-depth in the Exception Handling section of the documentation. You can use them to transform exceptions into HttpResponse
HttpResponse
s with appropriate error codes and human-readable failure descriptions.
路由 DSL 中的异常处理是通过提供 ExceptionHandler
ExceptionHandler
来完成的,在文档的 Exception Handling 部分对此进入了深入介绍。 你可以使用 ExceptionHandler
ExceptionHandler
以合适的错误代码和人类可读的失败描述来转换异常为 HttpResponse
HttpResponse
。
File uploads
文件上传
For high level directives to handle uploads see the FileUploadDirectives.
用于处理上传的高级指令见 FileUploadDirectives 。
Handling a simple file upload from for example a browser form with a file input can be done by accepting a Multipart.FormData entity, note that the body parts are Source rather than all available right away, and so is the individual body part payload so you will need to consume those streams both for the file and for the form fields.
处理简单文件上传(form)表单,例如带有一个 file 输入框的浏览器(form)表单可以通过接受 Multipart.FormData 实体来完成。 注意,正文部分是 Source 而不是立即可用,因此,单个正文部分载荷也是如此,所以你需要同时对文件和表单字段消费这些流。
Here is a simple example which just dumps the uploaded file into a temporary file on disk, collects some form fields and saves an entry to a fictive database:
这里是一个简单的示例,只是转储上传文件到磁盘上的临时文件里面,收集一些表单字段,并保存条目到一个虚构的数据库:
- Scala
-
val uploadVideo = path("video") { entity(as[Multipart.FormData]) { formData => // collect all parts of the multipart as it arrives into a map val allPartsF: Future[Map[String, Any]] = formData.parts.mapAsync[(String, Any)](1) { case b: BodyPart if b.name == "file" => // stream into a file as the chunks of it arrives and return a future // file to where it got stored val file = File.createTempFile("upload", "tmp") b.entity.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ => (b.name -> file)) case b: BodyPart => // collect form field values b.toStrict(2.seconds).map(strict => (b.name -> strict.entity.data.utf8String)) }.runFold(Map.empty[String, Any])((map, tuple) => map + tuple) val done = allPartsF.map { allParts => // You would have some better validation/unmarshalling here db.create(Video( file = allParts("file").asInstanceOf[File], title = allParts("title").asInstanceOf[String], author = allParts("author").asInstanceOf[String])) } // when processing have finished create a response for the user onSuccess(allPartsF) { allParts => complete { "ok!" } } } }
- Java
-
import static akka.http.javadsl.server.Directives.complete; import static akka.http.javadsl.server.Directives.entity; import static akka.http.javadsl.server.Directives.onSuccess; import static akka.http.javadsl.server.Directives.path; path("video", () -> entity(Unmarshaller.entityToMultipartFormData(), formData -> { // collect all parts of the multipart as it arrives into a map final CompletionStage<Map<String, Object>> allParts = formData.getParts().mapAsync(1, bodyPart -> { if ("file".equals(bodyPart.getName())) { // stream into a file as the chunks of it arrives and return a CompletionStage // file to where it got stored final File file = File.createTempFile("upload", "tmp"); return bodyPart.getEntity().getDataBytes() .runWith(FileIO.toPath(file.toPath()), materializer) .thenApply(ignore -> new Pair<String, Object>(bodyPart.getName(), file) ); } else { // collect form field values return bodyPart.toStrict(2 * 1000, materializer) .thenApply(strict -> new Pair<String, Object>(bodyPart.getName(), strict.getEntity().getData().utf8String()) ); } }).runFold(new HashMap<String, Object>(), (acc, pair) -> { acc.put(pair.first(), pair.second()); return acc; }, materializer); // simulate a DB call final CompletionStage<Void> done = allParts.thenCompose(map -> // You would have some better validation/unmarshalling here DB.create((File) map.get("file"), (String) map.get("title"), (String) map.get("author") )); // when processing have finished create a response for the user return onSuccess(allParts, x -> complete("ok!")); }) );
You can transform the uploaded files as they arrive rather than storing them in a temporary file as in the previous example. In this example we accept any number of .csv
files, parse those into lines and split each line before we send it to an actor for further processing:
你可以在上传文件到达时转换,而不是上前面例子里存储到一个临时文件里。在这个例子里,我们接受任意数量的 .csv
文件, 将其按行解析并分割每一行,然后发送到 actor 里进行进一步处理。
- Scala
-
val splitLines = Framing.delimiter(ByteString("\n"), 256) val csvUploads = path("metadata" / LongNumber) { id => entity(as[Multipart.FormData]) { formData => val done: Future[Done] = formData.parts.mapAsync(1) { case b: BodyPart if b.filename.exists(_.endsWith(".csv")) => b.entity.dataBytes .via(splitLines) .map(_.utf8String.split(",").toVector) .runForeach(csv => metadataActor ! MetadataActor.Entry(id, csv)) case _ => Future.successful(Done) }.runWith(Sink.ignore) // when processing have finished create a response for the user onSuccess(done) { _ => complete { "ok!" } } } }
- Java
-
import static akka.http.javadsl.server.Directives.complete; import static akka.http.javadsl.server.Directives.entity; import static akka.http.javadsl.server.Directives.onComplete; import static akka.http.javadsl.server.Directives.path; Route csvUploads() { final Flow<ByteString, ByteString, NotUsed> splitLines = Framing.delimiter(ByteString.fromString("\n"), 256); return path(segment("metadata").slash(longSegment()), id -> entity(Unmarshaller.entityToMultipartFormData(), formData -> { final CompletionStage<Done> done = formData.getParts().mapAsync(1, bodyPart -> bodyPart.getFilename().filter(name -> name.endsWith(".csv")).map(ignored -> bodyPart.getEntity().getDataBytes() .via(splitLines) .map(bs -> bs.utf8String().split(",")) .runForeach(csv -> metadataActor.tell(new Entry(id, csv), ActorRef.noSender()), materializer) ).orElseGet(() -> // in case the uploaded file is not a CSV CompletableFuture.completedFuture(Done.getInstance())) ).runWith(Sink.ignore(), materializer); // when processing have finished create a response for the user return onComplete(() -> done, ignored -> complete("ok!")); }) ); }
Configuring Server-side HTTPS
配置服务器端 HTTPS
For detailed documentation about configuring and using HTTPS on the server-side refer to Server-Side HTTPS Support.
有关在服务器端配置和使用 HTTPS 的详细文档,参考 服务器端 HTTPS 支持 。