Akka并发编程——第三节:Actor模型(二)

    xiaoxiao2026-04-06  9

    本节主要内容:

    Actor API解析

    1. Actor API解析

    Actor中的主要成员变量和方法定义如下:

    package akka.actor trait Actor extends scala.AnyRef { type Receive = akka.actor.Actor.Receive //context变量暴露当前Actor的上下文信息及当前消息 implicit val context : akka.actor.ActorContext = { /* compiled code */ } //self作为当前ActorRef的引用 implicit final val self : akka.actor.ActorRef = { /* compiled code */ } //当前Actor接收到最后一条消息对应的消息发送者(Actor) final def sender() : akka.actor.ActorRef = { /* compiled code */ } //receive方法,抽象方法,定义Actor的行为逻辑 def receive : akka.actor.Actor.Receive //内部使用API protected[akka] def aroundReceive(receive : akka.actor.Actor.Receive, msg : scala.Any) : scala.Unit = { /* compiled code */ } protected[akka] def aroundPreStart() : scala.Unit = { /* compiled code */ } protected[akka] def aroundPostStop() : scala.Unit = { /* compiled code */ } protected[akka] def aroundPreRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ } protected[akka] def aroundPostRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ } //监督策略,用于Actor容错处理 def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ } //Hook方法,用于Actor生命周期监控 @scala.throws[T](classOf[scala.Exception]) def preStart() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postStop() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ } //发送给Actor的消息,Actor没有定义相应的处理逻辑时,会调用此方法 def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ } } object Actor extends scala.AnyRef { type Receive = scala.PartialFunction[scala.Any, scala.Unit] //空的行为逻辑 @scala.SerialVersionUID(1) object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive { def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ } def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ } } //Sender为null @scala.SerialVersionUID(1) final val noSender : akka.actor.ActorRef = { /* compiled code */ } }

    (1) Hook方法,preStart()、postStop()方法的使用

    /* *Actor API: Hook方法 */ object Example_05 extends App{ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class FirstActor extends Actor with ActorLogging{ //通过context.actorOf方法创建Actor var child:ActorRef = _ //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作 override def preStart(): Unit ={ log.info("preStart() in FirstActor") //通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myChild") } def receive = { //向MyActor发送消息 case x => child ! x;log.info("received "+x) } //Hook方法,postStop(),Actor停止之后调用 override def postStop(): Unit = { log.info("postStop() in FirstActor") } } class MyActor extends Actor with ActorLogging{ //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作 override def preStart(): Unit ={ log.info("preStart() in MyActor") } def receive = { case "test" => log.info("received test") case _ => log.info("received unknown message") } //Hook方法,postStop(),Actor停止之后调用 override def postStop(): Unit = { log.info("postStop() in MyActor") } } val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }

    代码运行结果:

    [INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息 [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123 [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received test [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received unknown message [INFO] [04/02/2016 17:04:54.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myChild] postStop() in MyActor [INFO] [04/02/2016 17:04:54.617] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] postStop() in FirstActor

    在代码

    class FirstActor extends Actor with ActorLogging{ //通过context.actorOf方法创建Actor var child:ActorRef = _ //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作 override def preStart(): Unit ={ log.info("preStart() in FirstActor") //通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myChild") } def receive = { //向MyActor发送消息 case x => child ! x;log.info("received "+x) } //Hook方法,postStop(),Actor停止之后调用,用于完成初始化工作 override def postStop(): Unit = { log.info("postStop() in FirstActor") } }

    中分别对postStop、preStart方法进行了重写,在preStart方法中通过代码

    child = context.actorOf(Props[MyActor], name = "myChild")

    对成员变量child进行初始化,然后在postStop方法中使用

    //通过context上下文停止MyActor的运行 context.stop(child)

    停止MyActor的运行。在使用代码

    //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

    创建FirstActor时,便会调用preStart方法完成MyActor的创建,因此首先会执行FirstActor中的preStart()方法

    dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor

    然后在创建MyActor时执行MyActor中定义的preStart方法

    [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor

    在执行

    //关闭ActorSystem,停止程序的运行 system.shutdown()

    FirstActor作为MyActor的Supervisor,会先停止MyActor,再停止自身,因此先调用MyActor的postStop方法,再调用FirstActor的postStop方法。

    (2) 成员变量self及成员方法sender方法的使用

    整体代码如下:

    /* *Actor API:成员变量self及sender()方法的使用 */ object Example_05 extends App{ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class FirstActor extends Actor with ActorLogging{ //通过context.actorOf方法创建Actor var child:ActorRef = _ override def preStart(): Unit ={ log.info("preStart() in FirstActor") //通过context上下文创建Actor child = context.actorOf(Props[MyActor], name = "myActor") } def receive = { //向MyActor发送消息 case x => child ! x;log.info("received "+x) } } class MyActor extends Actor with ActorLogging{ self!"message from self reference" def receive = { case "test" => log.info("received test");sender()!"message from MyActor" case "message from self reference"=>log.info("message from self refrence") case _ => log.info("received unknown message"); } } val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }

    运行结果:

    [INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息 [INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123 [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] message from self refrence [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from MyActor [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

    代码:

    class MyActor extends Actor with ActorLogging{ self!"message from self reference" def receive = { case "test" => log.info("received test");sender()!"message from MyActor" case "message from self reference"=>log.info("message from self refrence") case _ => log.info("received unknown message"); } }

    中使用

    self!"message from self reference"

    向自身发送了一条消息,receive方法通过

    case "message from self reference"=>log.info("message from self refrence")

    对这条消息进行处理。receive方法在处理

    def receive = { case "test" => log.info("received test");sender()!"message from MyActor"

    “test”消息时,会调用

    sender()!"message from MyActor"

    向sender(本例中为FirstActor)发送”message from MyActor”消息,FirstActor使用

    def receive = { //MyActor发送消息 case x => child ! x;log.info("received "+x) }

    处理消息时又向MyActor回送该消息,因此最终的输出有两个unknown message,分别对应123和”message from MyActor”

    [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

    (3) unhandled方法的使用

    unhandled方法用于处理没有被receive方法处理的消息,下面的代码给出的是当不重写unhandled方法时的代码

    /* *Actor API:unhandled方法 */ object Example_06 extends App{ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class FirstActor extends Actor with ActorLogging{ def receive = { //向MyActor发送消息 case "test" => log.info("received test") } } val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }

    代码输出:

    [INFO] [04/02/2016 19:14:11.992] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息 [INFO] [04/02/2016 19:14:11.992] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test

    不难看出,对于

    myactor! 123

    发送的这条消息没有被处理,没有任何的处理逻辑。在实际开发过程中,可能会对不能被处理的消息增加一些应对逻辑,此时可以重写unhandled方法,代码如下:

    /* *Actor API:unhandled方法的使用 */ object Example_06 extends App{ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class FirstActor extends Actor with ActorLogging{ def receive = { //向MyActor发送消息 case "test" => log.info("received test") } //重写unhandled方法 override def unhandled(message: Any): Unit = { log.info("unhandled message is {}",message) } } val system = ActorSystem("MyActorSystem") val systemLog=system.log //创建FirstActor对象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") systemLog.info("准备向myactor发送消息") //向myactor发送消息 myactor!"test" myactor! 123 Thread.sleep(5000) //关闭ActorSystem,停止程序的运行 system.shutdown() }

    代码输出结果:

    [INFO] [04/02/2016 19:17:18.458] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息 [INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] unhandled message is 123

    其它如preRestart等方法的使用将在Akka容错部分进行讲解。



    Scala学习(公众微信号:ScalaLearning)每天为大家带来一点Scala语言、Spark、Kafka、Flink、AKKA等大数据技术干货及相关技术资讯。技术永无止境,勇攀高峰,一往直前! 觉得文章不错?扫描关注

    相关资源:python入门教程(PDF版)
    最新回复(0)