Clojure 的并发(一) Ref和STM
Clojure 的并发(二)Write Skew分析
Clojure 的并发(三)Atom、缓存和性能
Clojure 的并发(四)Agent深入分析和Actor
Clojure 的并发(五)binding和let
Clojure的并发(六)Agent可以改进的地方
Clojure的并发(七)pmap、pvalues和pcalls
Clojure的并发(八)future、promise和线程
四、 Agent和Actor
除了用于协调同步的Ref,独立同步的Ref,还有一类非常常见的需求:你可能希望状态的更新是异步,你通常不关心更新的结果,这时候你可以考虑下使用Agent。
1、创建agent:
user
=>
(def counter (agent
0
))
#
'user/counter
user
=>
counter
#
<Agent@9444d1: 0>
通过agent函数你就可以创建一个agent,指向一个不可变的初始状态。
2、取agent的值,这跟Ref和Atom没啥两样,都是通过deref或者@宏:
user
=>
@counter
0
user
=>
(deref counter)
0
3、更新agent,通过send或者send-off函数给agent发送任务去更新agent:
user
=>
(
send
counter inc)
#
<Agent@9444d1: 0>
send返回agent对象,内部的值仍然是0,而非inc递增之后的1,这是因为send是异步发送,更新是在另一个线程执行,两个线程(REPL主线程和更新任务的线程)的执行顺序没有同步,显示什么取决于两者谁更快。更新肯定是发生了,查看counter的值:
user
=>
@counter
1
果然更新到了1了。send的方法签名:
(
send
a f
&
args)
其中f是更新的函数,它的定义如下:
(f state
-
of
-
agent
&
args)
也就是它会在第一个参数接收当前agent的状态,而args是send附带的参数。
还有个方法,send-off,它的作用于send类似:
user
=>
(
send
-
off counter inc)
#
<Agent@9444d1: 1>
user
=>
@counter
2
send和send-off的区别在于,send是将任务交给一个
固定大小的线程池执行
final public static ExecutorService pooledExecutor
=
Executors
.newFixedThreadPool
(
2
+
Runtime
.
getRuntime()
.
availableProcessors());
默认线程池大小是
CPU核数加上2。因此
send执行的任务最好不要有阻塞的操作。而send-off则使用没有大小限制(取决于内存)的线程池:
final public static ExecutorService soloExecutor
=
Executors
.newCachedThreadPool
();
因此,
send-off比较适合任务有阻塞的操作,如IO读写之类。请注意,
所有的agent是共用这些线程池,这从这些线程池的定义看出来,都是静态变量。
4、异步转同步,刚才提到send和send-off都是异步将任务提交给线程池去处理,如果你希望同步等待结果返回,那么可以使用await函数:
(
do
(
send
counter inc) (await counter) (println
@counter
))
send一个任务之后,调用await等待agent所有派发的更新任务结束,然后打印agent的值。await是阻塞当前线程,直到至今为止所有任务派发执行完毕才返回。await没有超时,会一直等待直到条件满足,await-for则可以接受等待的超时时间,如果超过指定时间没有返回,则返回nil,否则返回结果。
(
do
(
send
counter inc) (await
-
for
100
counter) (println
@counter
))
await-for接受的单位是毫秒。
5、错误处理
agent也可以跟Ref和Atom一样设置validator,用于约束验证。由于agent的更新是异步的,你不知道更新的时候agent是否发生异常,只有等到你去取值或者更新的时候才能发现:
user
=>
(def counter (agent
0
:validator number
?
)) #
'
user/counter
user
=>
(send counter (fn[_]
"
foo
"
)) #
<
clojure.lang.Agent@4de8ce62:
0
>
强制要求counter的值是数值类型,第二个表达式我们给counter发送了一个更新任务,想将状态更新为字符串"foo",由于是异步更新,返回的结果可能没有显示异常,当你取值的时候,问题出现了:
user
=>
@counter java.lang.Exception: Agent has errors (NO_SOURCE_FILE:
0
)
告诉你agent处于不正常的状态,如果你想获取详细信息,可以通过agent-errors函数:
user
=>
(.printStackTrace (agent
-
errors counter)) java.lang.IllegalArgumentException: No matching field found: printStackTrace
for
class
clojure.lang.PersistentList (NO_SOURCE_FILE:
0
)
你可以恢复agent到前一个正常的状态,通过clear-agent-errors函数:
user
=>
(clear
-
agent
-
errors counter) nil user
=>
@counter
0
6、加入事务
agent跟atom不一样,agent可以加入事务,在事务里调用send发送一个任务,
当事务成功的时候该任务将只会被发送一次,最多最少都一次。利用这个特性,我们可以实现在事务操作的时候写文件,达到ACID中的D——持久性的目的:
(def backup
-
agent (agent
"
output/messages-backup.clj
"
)) (def messages (ref [])) (use
'
[clojure.contrib.duck-streams :only (spit)])
(defn add
-
message
-
with
-
backup [msg] (dosync (let [snapshot (commute messages conj msg)] (send
-
off backup
-
agent (fn [filename] (spit filename snapshot) filename)) snapshot)))
定义了一个backup-agent用于保存消息,
add
-
message
-
with
-
backup函数首先将状态保存到messages,这是个普通的Ref,然后调用send-off给backup-agent一个任务:
(fn [filename] (spit filename snapshot) filename)
这个任务是一个匿名函数,它利用spit打开文件,写入当前的快照,并且关闭文件,文件名来自backup-agent的状态值。注意到,我们是用send-off,send-off利用cache线程池,哪怕阻塞也没关系。
利用事务加上一个backup-agent可以实现类似数据库的ACID,但是还是不同的,主要区别在于
backup-agent的更新是异步,并不保证一定写入文件,因此持久性也没办法得到保证。
7、关闭线程池:
前面提到agent的更新都是交给线程池去处理,在系统关闭的时候你需要关闭这两个线程吃,通过shutdown-agents方法,你再添加任务将被拒绝:
user
=>
(shutdown
-
agents) nil user
=>
(send counter inc) java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
0
) user
=>
(def counter (agent
0
)) #
'
user/counter
user
=>
(send counter inc) java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
0
)
哪怕我重新创建了counter,提交任务仍然被拒绝,进一步证明这些
线程池是全局共享的。
8、原理浅析
前文其实已经将agent的实现原理大体都说了,agent本身只是个普通的java对象,它的内部维持一个状态和一个队列:
volatile
Object state; AtomicReference
<
IPersistentStack
>
q
=
new
AtomicReference(PersistentQueue.EMPTY);
任务提交的时候,是封装成Action对象,添加到此队列
public
Object dispatch(IFn fn, ISeq args,
boolean
solo) {
if
(errors
!=
null
) {
throw
new
RuntimeException(
"
Agent has errors
"
, (Exception) RT.first(errors)); }
//
封装成action对象
Action action
=
new
Action(
this
, fn, args, solo); dispatchAction(action);
return
this
; }
static
void
dispatchAction(Action action) { LockingTransaction trans
=
LockingTransaction.getRunning();
//
有事务,加入事务
if
(trans
!=
null
) trans.enqueue(action);
else
if
(nested.get()
!=
null
) { nested.set(nested.get().cons(action)); }
else
{
//
入队
action.agent.enqueue(action); } }
send和send-off都是调用Agent的dispatch方法,只是两者的参数不一样,dispatch的第二个参数 solo决定了是使用哪个线程池处理action:
(defn send [#
^
clojure.lang.Agent a f
&
args] (. a (dispatch f args
false
))) (defn send
-
off [#
^
clojure.lang.Agent a f
&
args] (. a (dispatch f args
true
)))
send-off将solo设置为true,当为true的时候使用cache线程池:
final
public
static
ExecutorService soloExecutor
=
Executors.newCachedThreadPool();
final
static
ThreadLocal
<
IPersistentVector
>
nested
=
new
ThreadLocal
<
IPersistentVector
>
();
void
execute() {
if
(solo) soloExecutor.execute(
this
);
else
pooledExecutor.execute(
this
); }
执行的时候调用更新函数并设置新的状态:
try
{ Object oldval
=
action.agent.state; Object newval
=
action.fn.applyTo(RT.cons(action.agent.state, action.args)); action.agent.setState(newval); action.agent.notifyWatches(oldval, newval); }
catch
(Throwable e) {
//
todo report/callback
action.agent.errors
=
RT.cons(e, action.agent.errors); hadError
=
true
; }
9、跟actor的比较:
Agent跟Actor有一个显著的不同,agent的action来自于别人发送的任务附带的更新函数,而actor的action则是自身逻辑的一部分。因此,如果想用agent实现actor模型还是相当困难的,下面是我的一个尝试:
(ns actor) (defn receive [
&
args] (apply hash
-
map args)) (defn self []
*
agent
*
) (defn spawn [recv
-
map] (agent recv
-
map)) (defn
!
[actor msg] (send actor #(apply (get
%
1
%
2
) (vector
%
2
)) msg)) ;;启动一个actor (def actor (spawn (receive :hello #(println
"
receive
"
%
)))) ;;发送消息 hello (
!
actor :hello)
利用spawn启动一个actor,其实本质上是一个agent,而发送通过感叹号!,给agent发送一个更新任务,它从recv-map中查找消息对应的处理函数并将消息作为参数来执行。难点在于消息匹配,匹配这种简单类型的消息没有问题,但是如果匹配用到变量,暂时没有想到好的思路实现,例如实现两个actor的ping/pong。
文章转自庄周梦蝶 ,原文发布时间2010-07-19
相关资源:Java虚拟机并发编程-中文完整版(高清、带书签)