我们创建事务的目的是为了协调针对多个托管引用的变更。事务将会保证这些变更是原子的,也就是说,所有的托管引用要么全部被提交要么全部被丢弃,所 以在事务之外我们将不会看到有任何局部变更(partial changes)出现。此外,我们也可以用创建事务的方式来解决对单个ref先读后写所引发的相关问题。
Akka是用Scala开发出来的,所以如果我们工作中用的是Scala的话,就可以直接幸福地享用Akka简洁明了的API了。对于那些日常工作 中不能使用Scala开发的程序员,Akka同样也提供了一组方便的API,以帮助他们通过Java语言来使用该类库的功能。本节我们将会看到如何利用 Akka在Java和Scala中创建事务。
首先我们需要选一个适合用事务来解决的例子。我们在第5章中重构的EnergySource类使用了显式的加锁和解锁操作(其最终版本详见5.7节),下面让就我们将这些显式的加锁/解锁操作换用Akka的事务API来实现。
在Java中创建事务
为了将代码逻辑封装到一个事务中,我们需要创建一个Atomic类的实例并将代码放到该类的atomically()函数里。随后,我们可以通过调用Atomic实例的execute()函数来执行事务代码。类似于下面这样:
1 return new Atomic<Object>() { 2 public Object atomically() { 3 //code to run in a transaction... 4 return resultObject; 5 } 6}.execute();调用execute()函数的线程将负责执行atomically()函数里的代码。然而如果调用者本身并没有处在一个事务中的话,那么这个调用将会被封装在一个新的事务中。
下面让我们用Akka事务来重新实现EnergySource。首先,让我们将不可变状态封装到可变的Akka托管引用中去。
1 public class EnergySource { 2 private final long MAXLEVEL = 100; 3 final Ref<Long> level = new Ref<Long>(MAXLEVEL); 4 final Ref<Long> usageCount = new Ref<Long>(0L); 5 final Ref<Boolean> keepRunning = new Ref<Boolean>(true); 6 private static final ScheduledExecutorService replenishTimer = 7 Executors.newScheduledThreadPool(10);在这段变量定义的代码中,level和usageCount都被声明为Akka Ref,并且各自持有一个不可变的Long类型的值。于是在Java中我们就不能更改这些Long类型的值了,但我们仍然可以通过更改托管引用(即实体)使其安全地指向新值。
在EnergySource的上一个版本中,ScheduledExecutorService会周期性地(每秒钟一次)调用 replenish()函数直至整个任务结束,这就要求stopEnergySource()必须是同步的。而在这个版本中,我们不用再周期性地调用 replenish()函数,而只会在对象实例初始化的时候执行一下调度操作。在每次调用replenish()函数时,我们都会根据 keepRunning的值来决定该函数是否应该在1秒之后再次被调度执行。这一变化消除了stopEnergySource()函数和调度器/计时器 (timer)之间的耦合。相反地,stopEnergySource()函数现在只依赖于keepRunning这个标志,而该标志可以很容易地通过 STM事务来行管理。
在这一版的代码中,由于可以依赖事务来保证安全性,所以我们没必要再对stopEnergySource()函数进行同步了。同时,由于swap()函数本身就是以事务方式执行的,所以我们也无需显式地为其创建事务。
01private EnergySource() {} 02private void init() { 03 replenishTimer.schedule(new Runnable() { 04 public void run() { 05 replenish(); 06 if (keepRunning.get()) replenishTimer.schedule( 07 this, 1, TimeUnit.SECONDS); 08 } 09 }, 1, TimeUnit.SECONDS); 10} 11public static EnergySource create() { 12 final EnergySource energySource = new EnergySource(); 13 energySource.init(); 14 return energySource; 15} 16public void stopEnergySource() { keepRunning.swap(false); }如下所示,返回当前电量和使用次数的方法将会用到托管引用,但也只是需要调用一下get()函数而已。
1 public long getUnitsAvailable() { return level.get(); } 2 public long getUsageCount() { return usageCount.get(); }在getUnitsAvailable()函数和getUsageCount()函数中,由于其中的get()函数都是以事务方式运行的,所以无需显式地将它们封装在事务里。
由于我们会在useEnergy()函数中同时修改电量和使用次数,所以useEnergy()函数需要使用一个显式的事务来完成这些操作。在这 里,我们需要保证对所有被读取的值的变更都能保持一致性,即确保对这两个字段的变更是原子的。为了实现这一目标,我们将使用Atomic接口,并用 atomically()函数将我们的逻辑代码封装到一个事务中。
01 public boolean useEnergy(final long units) { 02 return new Atomic<Boolean>() { 03 public Boolean atomically() { 04 long currentLevel = level.get(); 05 if(units > 0 && currentLevel >= units) { 06 level.swap(currentLevel - units); 07 usageCount.swap(usageCount.get() + 1); 08 return true; 09 } else { 10 return false; 11 } 12 } 13 }.execute(); 14}useEnergy()函数的功能是从当前电量中减掉所消耗的电量(即unit——译者注)。为了实现这一目标,我们需要保证所涉及到的get和 set操作都在同一个事务中完成,所以我们把所有相关操作都用atomically()函数封装了起来。最后,我们会调用execute()函数来启动事 务并顺序执行的所有操作。
除了上述方法之外,我们还需要关注一下负责给电源充电的replenish()函数。由于这个方法也需要使用事务,所以其实现代码同样需要用Atomic进行封装。
01 private void replenish() { 02 new Atomic() { 03 public Object atomically() { 04 long currentLevel = level.get(); 05 if (currentLevel < MAXLEVEL) level.swap(currentLevel + 1); 06 return null; 07 } 08 }.execute(); 09 } 10}下面是针对EnergySource类的测试代码。其主要功能是,用多个线程并发地使用电池,每使用一次消耗一格电,每个线程最多会消耗7格电量。
01 public class UseEnergySource { 02 private static final EnergySource energySource = EnergySource.create(); 03 public static void main(final String[] args) 04 throws InterruptedException, ExecutionException { 05 System.out.println("Energy level at start: " + 06 energySource.getUnitsAvailable()); 07 List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(); 08 for(int i = 0; i < 10; i++) { 09 tasks.add(new Callable<Object>() { 10 public Object call() { 11 for(int j = 0; j < 7; j++) energySource.useEnergy(1); 12 return null; 13 } 14 }); 15 } 16 final ExecutorService service = Executors.newFixedThreadPool(10); 17 service.invokeAll(tasks); 18 System.out.println("Energy level at end: " + 19 energySource.getUnitsAvailable()); 20 System.out.println("Usage: " + energySource.getUsageCount()); 21 energySource.stopEnergySource(); 22 service.shutdown(); 23 } 24}上述代码需要把Akka相关的Jar添加到Java的classpath中才能编译通过,所以首先我们需要创建一个标识jar位置的环境变量:
export AKKA_JARS="$AKKA_HOME/lib/scala-library.jar:\ $AKKA_HOME/lib/akka/akka-stm-1.1.3.jar:\ $AKKA_HOME/lib/akka/akka-actor-1.1.3.jar:\ $AKKA_HOME/lib/akka/multiverse-alpha-0.6.2.jar:\ $AKKA_HOME/config:\ ."Classpath的定义取决于你使用的操作系统以及Akka在你的操作系统中被安装的位置。我们可以用javac编译器来编译代码,并用java命令来负责执行,具体细节如下所示:
javac -classpath $AKKA_JARS -d . EnergySource.java UseEnergySource.java java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseEnergySource万事俱备,下面让我们来编译并执行这段代码。通过代码的实现逻辑我们知道,电源初始有100格电量,而我们创建的10个线程将会消耗掉其中的70格 电量,所以最后电源应该净剩30格电量。但由于电池电量会每秒回复一格,所以每次运行结果可能会稍有不同,比如最后净剩电量可能是31格而不是30格。
Energy level at start: 100 Energy level at end: 30 Usage: 70默认情况下,Akka会将额外的日志消息打印到标准输出上。停掉这个默认的输出也很容易,我们只需要在$AKKA_HOME/config目录下创 建一个名为logback.xml的文件,并在里面添加这项配置即可。由于这个文件位于classpath中,所以logger会自动找到这个文件、读取 其中的配置并停掉消息输出。除此之外,我们还可以在这个配置文件中设置很多其他有用的配置项。详情请见http://logback.qos.ch/manual/configuration.html。
正如我们在本例中所看到的那样,Akka是在后台默默地对事务进行管理的,所以请你多花些时间研究一下上述示例代码,并对事务和线程的运作过程多做一些尝试以便加深对这块知识的理解。
在Scala中创建事务
我们之前已经看到了如何在Java中创建事务(并且我假设你已经阅读过那一部分,所以这里我们就不再赘述了),下面我们将会在Scala中用更少的 代码来完成同样的功能。我们之所以能兼顾简洁与功能,部分得益于Scala自身简洁的特点,但更多还是由于Akka API使用了闭包/函数值(closures/function values)的缘故。
相比Java的繁冗,我们在Scala中可以通过很简洁的方法来创建事务。我们所需要做的只是调用一下Stm的auomic()函数就行了,如下所示:
1atomic { 2 //code to run in a transaction.... 3 /* return */ resultObject 4}其中,我们传给atomic()的闭包/函数值仅在当前线程所运行的那个事务内可见。
下面就是使用了Akka事务的Scala版本的EnergySource实现代码:
01 class EnergySource private() { 02 private val MAXLEVEL = 100L 03 val level = Ref(MAXLEVEL) 04 val usageCount = Ref(0L) 05 val keepRunning = Ref(true) 06 private def init() = { 07 EnergySource.replenishTimer.schedule(new Runnable() { 08 def run() = { 09 replenish 10 if (keepRunning.get) EnergySource.replenishTimer.schedule( 11 this, 1, TimeUnit.SECONDS) 12 } 13 }, 1, TimeUnit.SECONDS) 14 } 15 def stopEnergySource() = keepRunning.swap(false) 16 def getUnitsAvailable() = level.get 17 def getUsageCount() = usageCount.get 18 def useEnergy(units : Long) = { 19 atomic { 20 val currentLevel = level.get 21 if(units > 0 && currentLevel >= units) { 22 level.swap(currentLevel - units) 23 usageCount.swap(usageCount.get + 1) 24 true 25 } else false 26 } 27 } 28 private def replenish() = 29 atomic { if(level.get < MAXLEVEL) level.swap(level.get + 1) } 30} 31 object EnergySource { 32 val replenishTimer = Executors.newScheduledThreadPool(10) 33 def create() = { 34 val energySource = new EnergySource 35 energySource.init 36 energySource 37 } 38}作为一个完全的面向对象语言,Scala认为静态方法是不适合放在类的定义中的,所以工厂方法create()就被移到其伴生对象里面去了。余下的 代码和Java版本非常相近,只是较之更为简洁。同时,由于使用了优雅的atomic()函数,我们就可以抛开Atomic类和execute()函数调 用了。
Scala版本的EnergySource的测试用例如下所示。在并发和线程控制的实现方面,我们既可以像Java版本那样采用JDK的ExecutorService来管理线程,也可以使用Scala的角色(actor)[1] 来为每个并发任务分配执行线程。这里我们将采用第二种方式。当任务完成之后,每个任务都会给调用者返回一个响应,而调用者则需要等待所有任务结束之后才能继续执行。
01 object UseEnergySource { 02 val energySource = EnergySource.create() 03 def main(args : Array[String]) { 04 println("Energy level at start: " + energySource.getUnitsAvailable()) 05 val caller = self 06 for(i <- 1 to 10) actor { 07 for(j <- 1 to 7) energySource.useEnergy(1) 08 caller ! true 09 } 10 for(i <- 1 to 10) { receiveWithin(1000) { case message => } } 11 println("Energy level at end: " + energySource.getUnitsAvailable()) 12 println("Usage: " + energySource.getUsageCount()) 13 energySource.stopEnergySource() 14 } 15}我们可以采用如下命令来引入Akka相关的Jar并编译运行上述代码,其中环境变量AKKA_JARS与我们在Java示例中的定义相同:
scalac -classpath $AKKA_JARS *.scala java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseEnergySourceScala版本代码的输出结果与我们在Java版本中所看到的没什么两样,并同样依赖于电量恢复的节奏,即可能最终剩余电量是31而不是30。
Energy level at start: 100 Energy level at end: 30 Usage: 70[1]这里提到Scala的角色(actor)仅仅是为了说明有这种方法可供使用。后面我们还将会学习如何使用功能更为强大的Akka actor。
文章转自 并发编程网-ifeve.com
相关资源:敏捷开发V1.0.pptx