PowerApi

Akka gRPC是基于Akka HTTP实现的,怎样才可以访问HTTP Header呢?我们基于HTTP Header可以实现一些增强功能,比如: 调用链跟踪认证鉴权 等。这非常的简单,在sbt配置里添加akkaGrpcCodeGeneratorSettings += "server_power_apis"即可,这样生成的Akka gRPC自动生成的代码会额外提供XxxxPowerApi结尾的服务接口。

trait GreeterServicePowerApi extends GreeterService {
  
  def sayHello(
    in: greeter.HelloRequest,
    metadata: Metadata): scala.concurrent.Future[greeter.HelloReply]
  
  def itKeepsTalking(
    in: akka.stream.scaladsl.Source[greeter.HelloRequest, akka.NotUsed],
    metadata: Metadata): scala.concurrent.Future[greeter.HelloReply]
  
  def itKeepsReplying(
    in: greeter.HelloRequest,
    metadata: Metadata): akka.stream.scaladsl.Source[greeter.HelloReply, akka.NotUsed]
  
  def streamHellos(
    in: akka.stream.scaladsl.Source[greeter.HelloRequest, akka.NotUsed],
    metadata: Metadata): akka.stream.scaladsl.Source[greeter.HelloReply, akka.NotUsed]
  
  override def sayHello(in: greeter.HelloRequest): scala.concurrent.Future[greeter.HelloReply] = throw new GrpcServiceException(Status.UNIMPLEMENTED)
  
  override def itKeepsTalking(in: akka.stream.scaladsl.Source[greeter.HelloRequest, akka.NotUsed]): scala.concurrent.Future[greeter.HelloReply] = throw new GrpcServiceException(Status.UNIMPLEMENTED)
  
  override def itKeepsReplying(in: greeter.HelloRequest): akka.stream.scaladsl.Source[greeter.HelloReply, akka.NotUsed] = throw new GrpcServiceException(Status.UNIMPLEMENTED)
  
  override def streamHellos(in: akka.stream.scaladsl.Source[greeter.HelloRequest, akka.NotUsed]): akka.stream.scaladsl.Source[greeter.HelloReply, akka.NotUsed] = throw new GrpcServiceException(Status.UNIMPLEMENTED)
}

可以看到生成了GreeterServicePowerApi接口,它继承了GreeterService,并且默认的4个服务都已经有了默认实现:throw new GrpcServiceException(Status.UNIMPLEMENTED);取而代之的是4个新的重载函数,它们都多了一个Metadata参数。Metadata接口定义如下:

@DoNotInherit trait Metadata {
  def getText(key: String): Option[String]
  def getBinary(key: String): Option[ByteString]
  def asMap: Map[String, List[MetadataEntry]]
}

class MetadataImpl(headers: immutable.Seq[HttpHeader] = immutable.Seq.empty) extends Metadata {
  // ....
}

其实Metadata保存的就是HTTP Header,通过它的实现类MetadataImpl构造函数需要HttpHeader列表来初始化既可看出。它提供了getTextgetBinaryasMap方法提供了gRPC服务元数据(HTTP Header)的访问接口。

通过Akka gRPC生成的服务句柄类(GreeterServicePowerApiHandler),可以清晰的知道Akka gRPC是怎么创建Metadata的。

def partial(
    implementation: GreeterServicePowerApi,
    prefix: String = GreeterService.name,
    eHandler: ActorSystem => PartialFunction[Throwable, io.grpc.Status] = GrpcExceptionHandler.defaultMapper
  )(implicit mat: Materializer, system: ActorSystem): PartialFunction[HttpRequest, scala.concurrent.Future[HttpResponse]] = {
  implicit val ec: ExecutionContext = mat.executionContext
  import GreeterService.Serializers._

  def handle(request: HttpRequest, method: String): scala.concurrent.Future[HttpResponse] = method match {
    case "SayHello" =>
      val responseCodec = Codecs.negotiate(request)
      val metadata = new MetadataImpl(request.headers)
      GrpcMarshalling.unmarshal(request)(HelloRequestSerializer, mat)
        .flatMap(implementation.sayHello(_, metadata))
        .map(e => GrpcMarshalling.marshal(e, eHandler)(HelloReplySerializer, mat, responseCodec, system))
    
    case "ItKeepsTalking" =>
     // ....
  }
  // ....
}

case "SayHello" =>模式匹配既是构造Metadata和执行SayHellogRPC服务的代码逻辑。val metadata = new MetadataImpl(request.headers)一行代码通过request.headers构造了MetadataImpl对象。

在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交Pull Request。