Ruby Fiber指南(一)基础
Ruby Fiber指南(二)参数传递
Ruby Fiber指南(三)过滤器
Ruby Fiber指南(四)迭代器
Ruby Actor指南(五)实现Actor
写这个指南的时候,计划是第五章写一个Fiber的应用例子,但是一时没有想到比较好的例子,模仿《Programming in Lua》中的多任务下载的例子也不合适,因为Ruby中的异步HttpClient跟lua还是很不一样的,体现不了Fiber的优点。因此,这第五节一直拖着没写。
恰巧最近在小组中做了一次Erlang的分享,有人问到Erlang调度器的实现问题,这块我没注意过,那时候就根据我对coroutine实现actor的想法做了下解释,后来思考了下那个解释是错误的,Erlang的调度器是抢占式的,而通过coroutine实现的actor调度却是非抢占的,两者还是截然不同的。我在《
Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以实现actor风格,actor跟coroutine并没有必然的联系,这篇文章的目的就在于证明这一点,使用Ruby Fiber实现一个简单的actor风格的库,整个代码不到100行。后面还会谈到这个实现的缺点,以及我对Erlang调度器实现的理解。
首先是monkey patch,给Thread和Fiber类加上两个方法,分别用于获取当前线程的调度器和Fiber对应的actor:
class
Thread
#
得到当前线程的调度器
def
__scheduler__
@internal_scheduler
||=
FiberActor::Scheduler.new end end
class
Fiber
#
得到当前Fiber的actor
def
__actor__
@internal_actor end end
这里实现的actor仍然是Thread内的,一个Thread只跑一个调度器,每个actor关联一个Fiber。
让我们来想想调度器该怎么实现,调度器顾名思义就是协调actor的运行,每次挑选适当的actor并执行,可以想象调度器内部应该维护一个等待调度的actor队列,Scheduler每次从队列里取出一个actor并执行,执行完之后取下一个actor执行,不断循环持续这个过程;在没有actor可以调度的时候,调度器应该让出执行权。因此调度器本身也是一个Fiber,它内部有个queue,用于维护等待调度的actor:
module FiberActor
class
Scheduler
def
initialize @queue
=
[] @running
=
false end
def
run
return
if
@running @running
=
true
while
true
#
取出队列中的actor并执行
while
actor
=
@queue.shift begin actor.fiber.resume rescue
=>
ex puts
"
actor resume error,#{ex}
"
end end
#
没有任务,让出执行权
Fiber.
yield
end end
def
reschedule
if
@running
#
已经启动,只是被挂起,那么再次执行
@fiber.resume
else
#
将当前actor加入队列
self
<<
Actor.current end end
def
running? @running end
def
<<
(actor)
#
将actor加入等待队列
@queue
<<
actor unless @queue.last
==
actor
#
启动调度器
unless @running @queue
<<
Actor.current @fiber
=
Fiber.new { run } @fiber.resume end end end end
run方法是核心的调度方法,注释说明了主要的工作流程。因为调度器可能让出执行权,因此提供了reschedule方法重新resume启动调度器。<<方法用于将等待被调度的actor加入等待队列,如果调度器没有启动,那么就启动调度Fiber。
有了调度器,Actor的实现也很简单,Actor跟Fiber是一对一的关系,Actor内部维护一个mailbox,用来存储接收到的消息。最重要的是receive原语的实现,我们这里很简单,不搞模式匹配,只是接收消息。receive的工作流程大概是这样,判断mailbox中有没有消息,有消息的话,取出消息并调用block处理,没有消息的话就yield让出执行权。
module FiberActor
class
Actor attr_accessor :fiber
#
定义类方法
class
<<
self
def
scheduler Thread.current.
__scheduler__
end
def
current Fiber.current.
__actor__
end
#
启动一个actor
def
spawn(
*
args,
&
block) fiber
=
Fiber.new do block.call(args) end actor
=
new(fiber) fiber.instance_variable_set :@internal_actor,actor scheduler
<<
actor actor end
def
receive(
&
block) current.receive(
&
block) end end
def
initialize(fiber) @mailbox
=
[] @fiber
=
fiber end
#
给actor发送消息
def
<<
(msg) @mailbox
<<
msg
#
加入调度队列
Actor.scheduler
<<
self end
def
receive(
&
block)
#
没有消息的时候,让出执行权
Fiber.
yield
while
@mailbox.empty? msg
=
@mailbox.shift block.call(msg) end
def
alive? @fiber.alive? end end end
Actor.spawn用于启动一个actor,内部其实是创建了一个fiber并包装成actor给用户,每个actor一被创建就加入调度器的等待队列。<<方法用于向actor传递消息,传递消息后,该actor也将加入等待队列,等待被调度。
我们的简化版actor库已经写完了,可以尝试写几个例子,最简单的hello world:
include FiberActor Actor.spawn { puts
"
hello world!
"
}
输出:
hello world!
没有问题,那么试试传递消息:
actor
=
Actor.spawn{ Actor.receive{
|
msg
|
puts
"
receive #{msg}
"
} } actor
<<
:test_message
输出:
receive test_message
也成了,那么试试两个actor互相传递消息,乒乓一下下:
pong
=
Actor.spawn do Actor.receive do
|
ping
|
#
收到ping,返回pong
ping
<<
:pong end end ping
=
Actor.spawn do
#
ping一下,将ping作为消息传递
pong
<<
Actor.current Actor.receive do
|
msg
|
#
接收到pong
puts
"
ping #{msg}
"
end end
#
resume调度器
Actor.scheduler.reschedule
输出:
ping pong
都没有问题,这个超级简单actor基本完成了。可以看到,利用coroutine来实现actor是完全可行的,事实上我这里描述的实现基本上是
revactor这个库的实现原理。
revactor是一个ruby的actor库,它的实现就是基于Fiber,并且支持消息的模式匹配和thread之间的actor调度,有兴趣地可以去玩下。更进一步,其实采用轻量级协程来模拟actor风格早就不是新鲜主意,比如在
cn-erlounge的第四次会议上就有两个topic是关于这个,一个是51.com利用基于ucontext的实现的类erlang进程模型,一个是许世伟的CERL。可以想见,他们的基本原理跟本文所描述不会有太大差别,那么面对的问题也是一样。
采用coroutine实现actor的主要缺点如下:
1、因为是非抢占式,这就要求actor不能有阻塞操作,任何阻塞操作都需要异步化。IO可以使用异步IO,没有os原生支持的就需要利用线程池,基本上是一个重复造轮子的过程。
2、异常的隔离,某个actor的异常不能影响到调度器的运转,简单的try...catch是不够的。
3、多核的利用,调度器只能跑在一个线程上,无法充分利用多核优势。
4、效率因素,在actor数量剧增的情况下,简单的FIFO的调度策略效率是个瓶颈,尽管coroutine的切换已经非常高效。
当然,上面提到的这些问题并非无法解决,例如可以使用多线程多个调度器,类似erlang smp那样来解决单个调度器的问题。但是如调度效率这样的问题是很难解决的。相反,erlang的actor实现就不是通过coroutine,而是自己实现一套类似os的调度程序。
首先明确一点,Erlang的process的调度是抢占式的,而非couroutine的协作式的。其次,Erlang早期版本是只有一个调度器,运行在一个线程上,随着erts的发展,现在erlang的调度器已经支持smp,每个cpu关联一个调度器,并且可以明确指定哪个调度器绑定到哪个cpu上。第三,Erlang的调度也是采用优先队列+时间片轮询的方式,每个调度器关联一个
ErtsRunQueue,
ErtsRunQueue内部又分为三个
ErtsRunPrioQueue队列,分别对应high,max和normal,low的优先级,其中normal和low共用一个队列;在Erlang中时间片是以reduction为单位,你可以将reduction理解成一次函数调用,每个被调度的process能执行的reduction次数是有限的。调度器每次都是从max队列开始寻找等待调度的process并执行,当前调度的队列如果为空或者执行的reductions超过限制,那么就降低优先级,调度下一个队列。
从上面的描述可以看出,Erlang优秀的地方不仅在于actor风格的轻量级process,另一个强悍的地方就是它的类os的调度器,再加上OTP库的完美支持,这不是一般方案能山寨的。
文章转自庄周梦蝶 ,原文发布时间2010-04-13