Ruby Fiber指南(五): 实现Actor,兼谈Erlang的process调度

    xiaoxiao2024-05-17  114

    Ruby Fiber指南(一)基础     Ruby Fiber指南(二)参数传递     Ruby Fiber指南(三)过滤器     Ruby Fiber指南(四)迭代器     Ruby Actor指南(五)实现Actor     写这个指南的时候,计划是第五章写一个Fiber的应用例子,但是一时没有想到比较好的例子,模仿《Programming in Lua》中的多任务下载的例子也不合适,因为Ruby中的异步HttpClient跟lua还是很不一样的,体现不了Fiber的优点。因此,这第五节一直拖着没写。     恰巧最近在小组中做了一次Erlang的分享,有人问到Erlang调度器的实现问题,这块我没注意过,那时候就根据我对coroutine实现actor的想法做了下解释,后来思考了下那个解释是错误的,Erlang的调度器是抢占式的,而通过coroutine实现的actor调度却是非抢占的,两者还是截然不同的。我在《 Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以实现actor风格,actor跟coroutine并没有必然的联系,这篇文章的目的就在于证明这一点,使用Ruby Fiber实现一个简单的actor风格的库,整个代码不到100行。后面还会谈到这个实现的缺点,以及我对Erlang调度器实现的理解。     首先是monkey patch,给Thread和Fiber类加上两个方法,分别用于获取当前线程的调度器和Fiber对应的actor: class  Thread    # 得到当前线程的调度器    def   __scheduler__     @internal_scheduler ||= FiberActor::Scheduler.new   end end class  Fiber    # 得到当前Fiber的actor    def   __actor__     @internal_actor   end end      这里实现的actor仍然是Thread内的,一个Thread只跑一个调度器,每个actor关联一个Fiber。      让我们来想想调度器该怎么实现,调度器顾名思义就是协调actor的运行,每次挑选适当的actor并执行,可以想象调度器内部应该维护一个等待调度的actor队列,Scheduler每次从队列里取出一个actor并执行,执行完之后取下一个actor执行,不断循环持续这个过程;在没有actor可以调度的时候,调度器应该让出执行权。因此调度器本身也是一个Fiber,它内部有个queue,用于维护等待调度的actor: module FiberActor    class  Scheduler      def  initialize       @queue = []       @running = false     end      def  run        return   if  @running       @running = true        while  true          # 取出队列中的actor并执行          while  actor = @queue.shift           begin             actor.fiber.resume           rescue  =>  ex             puts  " actor resume error,#{ex} "           end         end          # 没有任务,让出执行权         Fiber. yield       end     end      def  reschedule        if  @running          # 已经启动,只是被挂起,那么再次执行         @fiber.resume        else          # 将当前actor加入队列         self  <<  Actor.current       end     end      def  running?       @running     end      def   << (actor)        # 将actor加入等待队列       @queue  <<  actor unless @queue.last  ==  actor        # 启动调度器       unless @running          @queue  <<  Actor.current          @fiber = Fiber.new { run }          @fiber.resume       end     end   end end     run方法是核心的调度方法,注释说明了主要的工作流程。因为调度器可能让出执行权,因此提供了reschedule方法重新resume启动调度器。<<方法用于将等待被调度的actor加入等待队列,如果调度器没有启动,那么就启动调度Fiber。     有了调度器,Actor的实现也很简单,Actor跟Fiber是一对一的关系,Actor内部维护一个mailbox,用来存储接收到的消息。最重要的是receive原语的实现,我们这里很简单,不搞模式匹配,只是接收消息。receive的工作流程大概是这样,判断mailbox中有没有消息,有消息的话,取出消息并调用block处理,没有消息的话就yield让出执行权。 module FiberActor      class  Actor     attr_accessor :fiber      # 定义类方法      class   <<  self        def  scheduler         Thread.current. __scheduler__       end        def  current         Fiber.current. __actor__       end        # 启动一个actor        def  spawn( * args, & block)         fiber = Fiber.new do            block.call(args)         end         actor = new(fiber)         fiber.instance_variable_set :@internal_actor,actor         scheduler  <<  actor         actor       end        def  receive( & block)         current.receive( & block)       end     end      def  initialize(fiber)        @mailbox = []        @fiber = fiber     end      # 给actor发送消息      def   <<  (msg)       @mailbox  <<  msg        # 加入调度队列       Actor.scheduler  <<  self     end      def  receive( & block)        # 没有消息的时候,让出执行权       Fiber. yield   while  @mailbox.empty?       msg = @mailbox.shift       block.call(msg)     end      def  alive?       @fiber.alive?     end   end end     Actor.spawn用于启动一个actor,内部其实是创建了一个fiber并包装成actor给用户,每个actor一被创建就加入调度器的等待队列。<<方法用于向actor传递消息,传递消息后,该actor也将加入等待队列,等待被调度。     我们的简化版actor库已经写完了,可以尝试写几个例子,最简单的hello world: include FiberActor Actor.spawn { puts  " hello world! " }      输出: hello world!     没有问题,那么试试传递消息: actor = Actor.spawn{    Actor.receive{  | msg |   puts  " receive #{msg} " } } actor  <<  :test_message     输出: receive test_message          也成了,那么试试两个actor互相传递消息,乒乓一下下: pong = Actor.spawn do       Actor.receive do  | ping |          # 收到ping,返回pong         ping  <<  :pong       end     end ping = Actor.spawn do        # ping一下,将ping作为消息传递       pong  <<  Actor.current       Actor.receive do  | msg |          # 接收到pong         puts  " ping #{msg} "       end     end # resume调度器 Actor.scheduler.reschedule      输出: ping pong           都没有问题,这个超级简单actor基本完成了。可以看到,利用coroutine来实现actor是完全可行的,事实上我这里描述的实现基本上是 revactor这个库的实现原理。 revactor是一个ruby的actor库,它的实现就是基于Fiber,并且支持消息的模式匹配和thread之间的actor调度,有兴趣地可以去玩下。更进一步,其实采用轻量级协程来模拟actor风格早就不是新鲜主意,比如在 cn-erlounge的第四次会议上就有两个topic是关于这个,一个是51.com利用基于ucontext的实现的类erlang进程模型,一个是许世伟的CERL。可以想见,他们的基本原理跟本文所描述不会有太大差别,那么面对的问题也是一样。      采用coroutine实现actor的主要缺点如下: 1、因为是非抢占式,这就要求actor不能有阻塞操作,任何阻塞操作都需要异步化。IO可以使用异步IO,没有os原生支持的就需要利用线程池,基本上是一个重复造轮子的过程。 2、异常的隔离,某个actor的异常不能影响到调度器的运转,简单的try...catch是不够的。 3、多核的利用,调度器只能跑在一个线程上,无法充分利用多核优势。 4、效率因素,在actor数量剧增的情况下,简单的FIFO的调度策略效率是个瓶颈,尽管coroutine的切换已经非常高效。     当然,上面提到的这些问题并非无法解决,例如可以使用多线程多个调度器,类似erlang smp那样来解决单个调度器的问题。但是如调度效率这样的问题是很难解决的。相反,erlang的actor实现就不是通过coroutine,而是自己实现一套类似os的调度程序。     首先明确一点,Erlang的process的调度是抢占式的,而非couroutine的协作式的。其次,Erlang早期版本是只有一个调度器,运行在一个线程上,随着erts的发展,现在erlang的调度器已经支持smp,每个cpu关联一个调度器,并且可以明确指定哪个调度器绑定到哪个cpu上。第三,Erlang的调度也是采用优先队列+时间片轮询的方式,每个调度器关联一个 ErtsRunQueue, ErtsRunQueue内部又分为三个 ErtsRunPrioQueue队列,分别对应high,max和normal,low的优先级,其中normal和low共用一个队列;在Erlang中时间片是以reduction为单位,你可以将reduction理解成一次函数调用,每个被调度的process能执行的reduction次数是有限的。调度器每次都是从max队列开始寻找等待调度的process并执行,当前调度的队列如果为空或者执行的reductions超过限制,那么就降低优先级,调度下一个队列。    从上面的描述可以看出,Erlang优秀的地方不仅在于actor风格的轻量级process,另一个强悍的地方就是它的类os的调度器,再加上OTP库的完美支持,这不是一般方案能山寨的。                文章转自庄周梦蝶  ,原文发布时间2010-04-13
    最新回复(0)