Typed Actor 风格

本文将讨论各种使用Typed Actor的风格,并尝试着建议一种行之有效的风格作为大家在应用Akka时的参考。

Note

任何时候,在准备应用Akka前都应该想一想是否有更简单的方式?如果有,则不要使用Akka!

直接构造 Behavior

通过Behaviors上的各个便捷函数定义行为,是使用Typed Actor最直观和简单的方式。

object Hello {
  def apply(): Behavior[Nothing] = Behaviors.receiveMessage { msg =>
    // Do business logic.
    Behaviors.same
  }
}

context.spawn(Hello(), "hello")

在一个object里通过apply定义行为只是一个惯例,你并不需要每次都这样。比如,你可以在spawn函数里直接构造一个行为。

context.spawn(
  Behaviors.receiveMessage { msg => 
    // Do business logic.
    Behaviors.same
  },
  "hello")

这在构建一个临时actor时有用,但通常这种情况下直接使用Future可能会更好。

在object里定义actor行为,object名字相当于对这个行为的所有实例(spawn创建以后的ActorRef[T])进行了类型命名(同一业务类型的actor)。同时,在object里定义这个actor能处理的消息是一个不错的对消息进行隔离的地方。

object Hello {
  trait Command
  case object Start extends Command
  final case class Question(message: String, replyTo: ActorRef[Reply]) extends Command
  case object StopOrder extends Command

  trait Reply
  final case class Answer(message: String) extends Reply

  def apply(dataSourceFactory: DataSourceFactory): Behavior[Command] =
    Behaviors.setup { context =>
      Behaviors.withTimers { timers =>
        new Hello(dataSourceFactory, context, timers).init()
      }
    }
}

使用函数嵌套

Scala中函数是第一类的,所以我们可以在代码块中直接定义函数,这样就可以将actor的不同行为定义在函数类部。

private case object InternalInit extends Command
def apply(dataSourceFactory: DataSourceFactory): Behavior[Command] = {
  Behaviors.setup { context =>
    Behaviors.withTimers { timers =>
      import context.executionContext
      context.self ! InternalInit

      def init(): Future[AutoCloseable] = Future {
        context.log.debug("开始异步初始化外部资源,idle……")
        dataSourceFactory.createDataSource()
      }

      def idle(): Behavior[Command] = Behaviors.withStash(1024) { stash =>
        var ds: AutoCloseable = null
        Behaviors.receiveMessage {
          case InternalInit =>
            context.pipeToSelf(init()) {
              case Success(value) =>
                ds = value
                Start
              case Failure(e) =>
                context.log.error(s"Init error: $e")
                StopOrder
            }
            Behaviors.same
          case Start =>
            context.log.debug("外部资源初始化完成,active……")
            // 切换行为active前,回放所有已stash消息
            stash.unstashAll(active(ds))
          case StopOrder =>
            if (stash.nonEmpty) {
              var messages = List.empty[Command]
              stash.foreach(messages ::= _)
              loggingStashedMessages(messages)
            }
            Behaviors.stopped
          case msg =>
            // 在 Start 前,stash进入的消息
            stash.stash(msg)
            Behaviors.same
        }
      }

      def active(ds: AutoCloseable): Behavior[Command] =
        Behaviors
          .receiveMessage[Command] {
            case Question(message, replyTo) =>
              replyTo ! Answer(s"You say is $message.")
              Behaviors.same

            // 其它业务 case
            // ...

            case StopOrder =>
              Behaviors.stopped
          }
          .receiveSignal {
            case (_, PreRestart) =>
              cleanup(ds)
              Behaviors.same
            case (_, PostStop) =>
              cleanup(ds)
              Behaviors.same
          }

      def loggingStashedMessages(messages: List[Command]): Unit = {
        // 对未处理的stash消息记录日志
      }

      def cleanup(ds: AutoCloseable): Unit = {
        if (null != ds) {
          ds.close()
        }
      }

      idle()
    }
  }
}

使用类

当actor的逻辑比较复杂时,比如:有多种行为、需要保存状态……函数嵌套的方式使一个函数实现非常的痈肿,代码缩进层次变多……这时,推荐使用类的方式来实现行为(Behavior):

class Hello private (
    dataSourceFactory: DataSourceFactory,
    context: ActorContext[Hello.Command],
    timers: TimerScheduler[Hello.Command]) {
  import Hello._
  import context.executionContext

  private var ds: AutoCloseable = _

  def init(): Behavior[Command] = {
    val future = Future {
      context.log.debug("开始异步初始化外部资源,idle……")
      ds = dataSourceFactory.createDataSource()
    }

    context.pipeToSelf(future) {
      case Success(_) => Start
      case Failure(exception) =>
        context.log.error(s"Init error: $exception")
        StopOrder
    }

    idle()
  }

  def idle(): Behavior[Command] = Behaviors.withStash(1024) { stash =>
    Behaviors.receiveMessage {
      case Start =>
        context.log.debug("外部资源初始化完成,active……")
        // 切换行为active前,回放所有已stash消息
        stash.unstashAll(active())
      case StopOrder =>
        if (stash.nonEmpty) {
          var messages = List.empty[Command]
          stash.foreach(messages ::= _)
          loggingStashedMessages(messages)
        }
        Behaviors.stopped
      case msg =>
        // 在 Start 前,stash进入的消息
        stash.stash(msg)
        Behaviors.same
    }
  }

  def active(): Behavior[Command] =
    Behaviors
      .receiveMessage[Command] {
        case Question(message, replyTo) =>
          replyTo ! Answer(s"You say is $message.")
          Behaviors.same

        // 其它业务 case
        // ...

        case StopOrder =>
          Behaviors.stopped
      }
      .receiveSignal {
        case (_, PreRestart) =>
          cleanup()
          Behaviors.same
        case (_, PostStop) =>
          cleanup()
          Behaviors.same
      }

  private def loggingStashedMessages(messages: List[Command]): Unit = {
    // 对未处理的stash消息记录日志
  }

  private def cleanup(): Unit = {
    if (null != ds) {
      ds.close()
    }
  }
}

建议

  1. 当逻辑比较复杂时,建议使用类的方式来实现actor行为。
  2. 函数嵌套的方式若嵌套超过2层或代码行数较大(40行),也建议使用类的方式,通过类成员函数实现行为的切换,而actor状态可以通过函数参数进行传递。

actor状态是作为成员函数的参数进行传递,还是定义成类属性?其实都可以,无所谓好坏,更多的是代码风格问题。

状态作为函数参数传递,优势是可以始终使用不可变数据,这样状态(数据)是线程安全的,可以避免状态的不小修改,且更函数式。但这样的缺点是每次消息处理后你都需要构造一个新的状态集(通常actor内会有多个状态变量或参数)传递给下一个行为,当状态比较多(而大)时这会很繁琐,也会污染正常的业务代码。

状态作为类属性,不可避免的会使用到可变变量或可变数据,这样它们就不是线程安全的了。但是,若你坚持始终在actor消息处理中访问/修改这些状态,那就可以以线程安全的方式处理它们。类属性的方式,不需要每次消息处理完后都构造一个新的状态集,所以某种程度上它不会污染正常的业务代码。

在此文档中发现错误?该页面的源代码可以在 这里 找到。欢迎随时编辑并提交Pull Request。