Akka笔记之消息传递

    xiaoxiao2024-05-25  126

    在Akka笔记第一篇的介绍中,我们大致介绍了下Akka工具箱中的Actor。在第二篇当中,我们来看一下Actor消息传递的功能。这里还是延用之前使用的那个学生-老师的例子。

    在Actor消息的第一部分中,我们会创建一个老师的Actor,但学生Actor则先不创建,而是使用一个叫做StudentSimulatorApp的主程序。

    仔细回顾下学生-老师模型

    我们现在只考虑StudentSimulatorApp发送给TeacherActor的消息。这里我所说的StudentSimulatorApp指的是一个正常的主程序。

    从图中可以看到: (如果有陌生的术语,没关系,后面我们会详细解释的)

    1. 学生创建了一个叫ActorSystem的东西。 2. 他通过ActorSystem来创建了一个叫ActorRef的对象。QuoteRequest消息就是发送给ActorRef的(它是TeacherActor的一个代理) 3. ActorRef将消息发送给Dispatcher 4. Dispatcher将消息投递到目标Actor的邮箱中。 5. 随后Dispatcher将Mailbox扔给一个线程去执行(这点下节会重点讲到) 6. MailBox将消息出队并最终将其委托给真实的Teacher Actor的接收方法去处理。

    正如我所说的,看不懂也别担心。现在我们来一步步地详细地分析下。全部讲完后你可以再回过头来看下这五个步骤。

    STUDENTSIMULATORAPP程序

    我们用这个STUDENTSIMULATORAPP来启动JVM并初始化ActorSystem。

    从图中可以看到,StudentSimulatorApp

    1. 创建了一个ActorSystem 2. 通过ActorSystem创建了一个Teacher Actor的代理(ActorRef) 3. 将QuoteRequest消息发送给代理

    我们现在只关注这三点。

    1. 创建ActorSystem

    ActorSystem是进入到Actor的世界的一扇大门。通过它你可以创建或中止Actor。甚至还可以把整个Actor环境给关闭掉。

    另一方面来说,Actor是一个分层的结构,ActorSystem之于Actor有点类似于java.lang.Object或者scala.Any的角色——也就是说,它是所有Actor的根对象。当你通过ActorSystem的actorOf方法创建了一个Actor时,你其实创建的是ActorSystem下面的一个Actor。

    初始化ActorSystem的代码是这样的:

    val system=ActorSystem("UniversityMessageSystem")

    UniversityMessageSystem只是你给ActorSystem起的一个可爱的名字而已。

    2. 创建一个TeacherActor的代理?

    我们来看下下面这段代码:

    val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])

    actorOf是ActorSystem中创建Actor的方法。但是正如你所看到的,它并不会返回我们所需要的TeacherActor。它返回的是一个ActorRef。

    这个ActorRef扮演了真实的Actor的一个代理的角色。客户端并不会直接和Actor通信。这也正是Actor模型中避免直接访问TeacherActor中任何的自定义/私有方法或者变量的一种方式。

    再重复一遍,消息只会发送给ActorRef,最终才会到达真正的Actor。你是绝对无法直接和Actor进行通信的。如果你真的找到了什么拙劣的方式来直接通信,大家会恨你入骨的。

    将消息发送给代理

    还是只有一行代码。你只需告诉说把QuoteRequest消息发送到ActorRef就好了。Actor中的这个告诉的方式就是一个!号。(ActorRef中确实也有一个tell方法,不过它只是把这个调用委托给了!号)

    //send a message to the Teacher Actor teacherActorRef!QuoteRequest

    这就可以了!

    如果你认为我在骗你的话,看一下下面StudentSimulatorApp的完整代码:

    STUDENTSIMULATORAPP.SCALA
    package me.rerun.akkanotes.messaging.actormsg1 import akka.actor.ActorSystem import akka.actor.Props import akka.actor.actorRef2Scala import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._ object StudentSimulatorApp extends App{ //Initialize the ActorSystem val actorSystem=ActorSystem("UniversityMessageSystem") //construct the Teacher Actor Ref val teacherActorRef=actorSystem.actorOf(Props[TeacherActor]) //send a message to the Teacher Actor teacherActorRef!QuoteRequest //Let's wait for a couple of seconds before we shut down the system Thread.sleep (2000) //Shut down the ActorSystem. actorSystem.shutdown() }

    好吧,我承认我撒了点小谎。你还得关掉ActorSystem,不然JVM会一直运行下去的。我还让主线程睡眠了一小会儿,以便给点时间让TeacherActor去完成它的任务。我知道这听起来很愚蠢。别担心。后面我们会通过些优雅的测试用例来替换掉这种取巧的方式。

    消息

    我们刚发送了一个QuoteMessage给ActorRef,但是,还压根儿没看着过这个消息类呢!

    说曹操,曹操到:

    (实践中推荐你把消息封装成一个好点的对象,这样维护起来容易些)

    TeacherProtocol
    package me.rerun.akkanotes.messaging.protocols object TeacherProtocol{ case class QuoteRequest() case class QuoteResponse(quoteString:String) }

    正如你所想的那样,QuoteRequest就是发送给TeacherActor的那个消息。Actor会回复一个QuoteResponse。

    分发器及邮箱

    ActorRef把消息处理功能委托给了Dispatcher。实际上,当我们创建ActorSystem和ActorRef的时候,就已经创建了一个Dispatcher和MailBox了。我们来看下它们是干什么的。

    邮箱

    每个Actor都有一个MailBox(后面会介绍一种特殊的情况)。在我们这个比喻当中,每个老师也有一个邮箱。老师得去检查邮箱并处理消息。在Actor的世界中,则是另一种形式——邮箱一有机会就会要求Actor去完成自己的任务。

    同样的,邮箱里也有一个队列来以FIFO的方式来存储并处理消息——它和实际的邮箱还有点不同,真实的邮箱新的信总是在最上面的。

    现在讲到分发器了

    Dispatcher会完成一些很酷的事。从它的角度来看,它只是从ActorRef中取出一条消息然后将它传给了MailBox。但是,在这后面发生了一件不可意义的事情:

    Dispatcher会封装一个ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去运行。

    看下Dispatcher里面的一段代码:

    protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = { ... try { executorService execute mbox ... }

    什么,你是说要执行一下邮箱?

    是的。我们看到MailBox中包含了队列里面的消息。由于Executor得去执行MailBox,所以它得是一个Thread类型。是的没错。MailBox的声明及构造器就是这样的。

    下面是MailBox的签名信息。

    private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable

    TeacherActor

    当MailBox的run方法运行的时候,它会从队列中取出一条消息,然后将它传给Actor去处理。

    当你把消息传给ActorRef的时候,最终调用的实际是目标Actor里面的一个receive方法。

    TeacherActor只是一个很简单的类,它有一个名言的列表,而receive方法很明显就是用来处理消息的。

    来看下代码:

    TeacherActor.scala

    package me.rerun.akkanotes.messaging.actormsg1 import scala.util.Random import akka.actor.Actor import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._ /* * Your Teacher Actor class. * * The class could use refinement by way of * using ActorLogging which uses the EventBus of the Actor framework * instead of the plain old System out * */ class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time", "You never gonna know if you never even try") def receive = { case QuoteRequest => { import util.Random //Get a random Quote from the list and construct a response val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size))) println (quoteResponse) } } }

    TeacherActor的receive方法的模式匹配只会匹配一种消息——QuoteRequest (事实上,模式匹配中最好匹配下默认的情况,不过这个就说来话长了)

    receive方法做的就是

    1. 匹配QuoteRequest的模式 2. 从名言列表中随机选取一条 3. 构造出一个QuoteResponse 4. 将QuoteResponse打印到控制台上

    代码

    整个项目的代码可以从Github中下载到。

    本文最早发布于我的个人博客: Java译站

    转载自 并发编程网 - ifeve.com 相关资源:敏捷开发V1.0.pptx
    最新回复(0)