【本文转载于 线程之从线程返回信息
习惯了传统单线程过程式模型的程序员在转向多线程环境时,最难掌握的一点就是如何从线程返回信息。我们再拿前一blog中的例子为例,不再简单地显示SHA-256摘要,摘要线程需要把摘要返回给执行主线程。大多数人的第一个反应就是把结果存储在一个字段中,再提供一个获取方法:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package o1; import java.io.FileInputStream; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; public class ReturnDigest extends Thread { private String filename; private byte [] digest; public ReturnDigest(String filename){ this .filename = filename; } @Override public void run() { try { FileInputStream in = new FileInputStream(filename); MessageDigest sha = MessageDigest.getInstance( "SHA-256" ); DigestInputStream din = new DigestInputStream(in,sha); while (din.read() != - 1 ); //读取整个文件 din.close(); digest = sha.digest(); } catch (IOException e1) { e1.printStackTrace(); } catch (Exception e2){ e2.printStackTrace(); } } public byte [] getDigest() { return digest; } } ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package o1; import javax.xml.bind.DatatypeConverter; public class ReturnDigestUserInterface { public static void main(String[] args) { String filename = "/home/fuhd/work/workspace/scala/t1/src/t1/MyTest.scala" ; ReturnDigest dr = new ReturnDigest(filename); dr.start(); //现在显示结果 StringBuilder result = new StringBuilder(filename); result.append( ": " ); byte [] digest = dr.getDigest(); result.append(DatatypeConverter.printHexBinary(digest)); System.out.println(result); } }ReturnDigest类把计算结果存储在私有字段digest中,可以通过getDigest()来访问。ReturnDigestUserInterface中的main()方法启动一个新的ReturnDigest线程,然后试图使用getDigest()获取结果。不过,当你运行这个程序时,结果却不像你期望的那样:
? 1 2 3 4 Exception in thread "main" java.lang.NullPointerException at javax.xml.bind.DatatypeConverterImpl.printHexBinary(DatatypeConverterImpl.java: 475 ) at javax.xml.bind.DatatypeConverter.printHexBinary(DatatypeConverter.java: 626 ) at o1.ReturnDigestUserInterface.main(ReturnDigestUserInterface.java: 14 )问题在于,主程序会在线程有机会初始化摘要之前就获取并使用摘要。dr.start()启动的计算可能在main()方法调用dr.getDigest()之前结束,也可能还没有结束。如果没有结束,dr.getDigest()则会返回null,第一次尝试访问digest是会抛出一个NullPointerException异常。
轮询
大多数新手采用的解决方案是,让获取方法返回一个标志值(或者可能抛出一个异常),直到设置了结果字段为止。然后主线程定期询问获取方法,查看是否返回了标志之外的值。这个例子中,表示要重复地测试digest是否为空,只有不为空才使用。示例:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package o1; import javax.xml.bind.DatatypeConverter; public class ReturnDigestUserInterface { public static void main(String[] args) { String filename = "/home/fuhd/work/workspace/scala/t1/src/t1/MyTest.scala" ; ReturnDigest dr = new ReturnDigest(filename); dr.start(); while ( true ){ //现在显示结果 byte [] digest = dr.getDigest(); if (digest != null ){ StringBuilder result = new StringBuilder(filename); result.append( ": " ); result.append(DatatypeConverter.printHexBinary(digest)); System.out.println(result); break ; } } } }这个解决方案是可行的。它会给出正确的答案。不过,它做了大量不需要做的工作。更糟糕的是,这个解决方案不能保证一定能工作。在有些虚拟机上,主线程会占用所有可用的时间,而没有给具体的工作线程留出任何时间。主线程太忙于检查工作的完成情况,以至于没有时间来具体完成任务!显然这不是一个好方法。
回调
事实上,还有一种更简单有效的方法来解决这个问题。这个方法的技巧在于,不是在主程序中重复地询问每个ReturnDigest线程是否结束,而是让线程告诉主线程它何时结束。这是通过调用主类(即启动这个线程的类)中的一个方法来做到的。这被称为回调(callback),因为线程在完成时反过来调用其创建者。这样一来,主程序就可以在等待线程结束期间休息,而不会占用运行线程的时间。当线程run()方法接近结束时,要做的最后一件事情就是基于结果调用主程序中的一个已知方法:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package o1; import java.io.FileInputStream; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; public class CallbackDigest implements Runnable { private String filename; public CallbackDigest(String filename){ this .filename = filename; } @Override public void run() { try { FileInputStream in = new FileInputStream(filename); MessageDigest sha = MessageDigest.getInstance( "SHA-256" ); DigestInputStream din = new DigestInputStream(in,sha); while (din.read() != - 1 ); //读取整个文件 din.close(); byte [] digest = sha.digest(); CallbackDigestUserInterface.receiveDigest(digest,filename); } catch (IOException e1) { e1.printStackTrace(); } catch (Exception e2){ e2.printStackTrace(); } } } ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package o1; import javax.xml.bind.DatatypeConverter; public class CallbackDigestUserInterface { public static void main(String[] args) { String filename = "/home/fuhd/work/workspace/scala/t1/src/t1/MyTest.scala" ; CallbackDigest dr = new CallbackDigest(filename); Thread thread = new Thread(dr); thread.start(); } public static void receiveDigest( byte [] digest,String filename){ StringBuilder result = new StringBuilder(filename); result.append( ": " ); result.append(DatatypeConverter.printHexBinary(digest)); System.out.println(result); } }示例中使用静态方法完成回调,这样CallbackDigest只需要知道CallackDigestUserInterface中要调用的方法名。不过,回调实例方法也不会太难(而且回调实例方法更为常见)。这种情况下,进行回调的类必须有其回调对象的一个引用。通常情况下,这个引用通过线程构造函数来提供。当run()方法接近结束时,要做的最后一件事情就是调用回调对象的实例方法来传递结果。如例:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package o1; import java.io.FileInputStream; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; public class InstanceCallbackDigest implements Runnable { private String filename; private InstanceCallbackDigestUserInterface callback; public InstanceCallbackDigest(String filename,InstanceCallbackDigestUserInterface callback){ this .filename = filename; this .callback = callback; } @Override public void run() { try { FileInputStream in = new FileInputStream(filename); MessageDigest sha = MessageDigest.getInstance( "SHA-256" ); DigestInputStream din = new DigestInputStream(in,sha); while (din.read() != - 1 ); //读取整个文件 din.close(); byte [] digest = sha.digest(); callback.receiveDigest(digest, filename); } catch (IOException e1) { e1.printStackTrace(); } catch (Exception e2){ e2.printStackTrace(); } } } ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package o1; import javax.xml.bind.DatatypeConverter; public class InstanceCallbackDigestUserInterface { public static void main(String[] args) { String filename = "/home/fuhd/work/workspace/scala/t1/src/t1/MyTest.scala" ; InstanceCallbackDigestUserInterface main = new InstanceCallbackDigestUserInterface(); InstanceCallbackDigest dr = new InstanceCallbackDigest(filename,main); Thread thread = new Thread(dr); thread.start(); } public void receiveDigest( byte [] digest,String filename){ StringBuilder result = new StringBuilder(filename); result.append( ": " ); result.append(DatatypeConverter.printHexBinary(digest)); System.out.println(result); } }相比于轮询机制,回调机制的第一个优点是不会浪费那么多CPU周期。但更重要的优点是回调更灵活,可以处理涉及更多线程,对象和类的更复杂的情况。例如,如果有多个对象对线程的计算结果感兴趣,那么线程可以保存一个要回调的对象列表。特定的对象可以通过调用Thread或Runnable类的一个方法把自己添加到这个列表中来完成注册,表示自己对计算结果很感觉兴趣。如果有多个类的实例对结果感兴趣,可以定义一个新的interface(接口),所有这些类都要实现这个新接口。这个interface(接口)将声明回调方法。如果你对此有种似曾相识的感觉,没错,这就是Swing,AWT中处理事件的机制。这种机制有一个更一般的名字:观察者(Observer)设计模式。
Future,Callable和Executor
java5引入了多线程编程的一个新方法,通过隐藏细节可以更容易地处理回调。不再是直接创建一个线程,你要创建一个ExecutorService,它会根据需要为你创建线程。可以向ExecutorService提交Callable任务,对于每个Callable任务,会分别得到一个Future。之后可以向Future请求得到任务的结果。如果结果已经准备就绪,就会立即得到这个结果。如果还没有准备好,轮询线程会阻塞,直到结果准备就绪。这种做法的好处是,你可以创建很多不同的线程,然后按你需要的顺序得到你需要的答案。
例如,假设你要找出一个很大的数字数组中的最大值。如果采用最原始的方法实现,需析时间为O(n),其中n是数组中的元素个数。不过,如果可以将这个工作分解到多个线程,每个线程分别在一个单独的内核上运行,这样就会快得多。如例:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package o1; import java.util.concurrent.Callable; public class FindMaxTask implements Callable<Integer> { private int [] data; private int start; private int end; public FindMaxTask( int [] data, int start, int end){ this .data = data; this .start = start; this .end = end; } @Override public Integer call() throws Exception { int max = Integer.MIN_VALUE; for ( int i = start;i < end; i++){ if (data[i] > max) max = data[i]; } return max; } }Callable接口定义了一个call()方法,它可以返回任意的类型。尽管可以直接调用call()方法,但这并不是它的本来目的。实际上,你要把Callable对象提交给一个Executor,它会为每个Callable对象创建一个线程(Executor还可以使用其他策略,例如,它可以使用一个线程按顺序调用这些callable,不过对于这个问题来说,每个callable分别对应一个线程是一个很好的策略)。示例:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package o1; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class MultithreadedMaxFinder { public int max( int [] data,ExecutorService service) throws InterruptedException,ExecutionException{ if (data.length == 1 ){ return data[ 0 ]; } else if (data.length == 0 ){ throw new IllegalArgumentException(); } //将任务分解为两部分 FindMaxTask task1 = new FindMaxTask(data, 0 ,data.length/ 2 ); FindMaxTask task2 = new FindMaxTask(data,data.length/ 2 ,data.length); //创建2个线程 Future<Integer> f1 = service.submit(task1); Future<Integer> f2 = service.submit( task2); return Math.max(f1.get(), f2.get()); } public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool( 2 ); try { MultithreadedMaxFinder m = new MultithreadedMaxFinder(); int [] numArr = { 345 , 213 , 45 , 675 , 127 , 478 , 456 }; System.out.println(m.max(numArr,service)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (Exception e){ e.printStackTrace(); } finally { service.shutdown(); } } }这里会同时搜索两个子数组,所以对于合适的硬件和规模很大的输入。这个程序运行的速度几乎可以达到原来的两倍。不仅如此,与先找出数组前一半的最大值再找出数组后一半的最大值的做法相比,这个代码几乎同样简单和直接,而不用担心线程或异步性。不过,这里有一个重要的区别。调用f1.get()时,这个方法会阻塞,等待第一个FindMaxTask完成。只有当第一个FindMaxTask完成时,才会调用f2.get()。也有可能第二个线程已经结束,在这种情况下,结果值会直接返回,但是如果第二个线程还没有结束,同样的,也会等待这个线程完成。一旦两个线程都结束,将比较它们的结果,并返回最大值。
Future是一种非常方便的做法,可以启动多个线程来处理一个问题的不同部分,然后等待它们全部都结束之后再继续。
