CompletionService

    xiaoxiao2026-01-07  8

    如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。

    CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

    它是一个更高级的ExecutorService,它本身自带一个线程安全的线性表,无需用户额外创建。它提供了2种方法从线性表中取出结果,poll()是非阻塞的,若目前无结果,返回一个null,线程继续运行不阻塞。take()是阻塞的,若当前无结果,则线程阻塞,直到产生一个结果,被取出返回,线程才继续运行。

    此类将安排那些完成时提交的任务,把它们结果放置在可使用 take 访问的队列上, 外部可以通过take(),poll(),poll(long timeout,TimeUnit unit)来取得。该类非常轻便,适合于在执行几组任务时临时使用。

    主要构造函数public ExecutorCompletionService(Executor executor)    使用为执行基本任务而提供的执行程序创建一个 ExecutorCompletionService,并将 LinkedBlockingQueue 作为完成队列。    参数:        executor - 要使用的执行程序     抛出:        NullPointerException - 如果执行程序为 nullpublic ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)    使用为执行基本任务而提供的执行程序创建一个 ExecutorCompletionService,并将所提供的队列作为其完成队列。    参数:        executor - 要使用的执行程序        completionQueue - 用作完成队列的队列,通常是专供此服务使用的队列     抛出:        NullPointerException - 如果执行程序或 completionQueue 为 null

    主要成员函数 public Future<V>  submit(Callable<V> task)     从接口 CompletionService 复制的描述     提交要执行的值返回任务,并返回表示挂起的任务结果的 Future。在完成时,可能会提取或轮询此任务。     指定者:         接口 CompletionService<V> 中的 submit     参数:         task - 要提交的任务      返回:         一个表示挂起的任务完成的 Future public Future<V>  submit(Runnable task,V result)     从接口 CompletionService 复制的描述     提交要执行的 Runnable 任务,并返回一个表示任务完成的 Future,可以提取或轮询此任务。     指定者:         接口 CompletionService<V> 中的 submit     参数:         task - 要提交的任务         result - 要返回的已成功完成任务的结果      返回:         一个表示挂起的任务完成的 Future,其 get() 方法将返回完成时给出的结果值 public Future<V>  take()                throws InterruptedException     从接口 CompletionService 复制的描述     获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。     指定者:         接口 CompletionService<V> 中的 take     返回:         表示下一个已完成任务的 Future      抛出:         InterruptedException - 如果在等待时被中断 public Future<V>  poll()     从接口 CompletionService 复制的描述     获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。     指定者:         接口 CompletionService<V> 中的 poll     返回:         表示下一个已完成任务的 Future;如果不存在这样的任务,则返回 null public Future<V>  poll(long timeout,                       TimeUnit unit)                throws InterruptedException     从接口 CompletionService 复制的描述     获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要)。     指定者:         接口 CompletionService<V> 中的 poll     参数:         timeout - 放弃之前需要等待的时间长度,以 unit 为时间单位         unit - 确定如何解释 timeout 参数的 TimeUnit      返回:         表示下一个已完成任务的 Future;如果等待了指定时间仍然不存在这样的任务,则返回 null      抛出:         InterruptedException - 如果在等待时被中断 代码: import java.util.Random;   import java.util.concurrent.BlockingQueue;   import java.util.concurrent.Callable;   import java.util.concurrent.CompletionService;   import java.util.concurrent.ExecutionException;   import java.util.concurrent.ExecutorCompletionService;   import java.util.concurrent.ExecutorService;   import java.util.concurrent.Executors;   import java.util.concurrent.Future;   import java.util.concurrent.LinkedBlockingQueue;      public class Test17 {       public static void main(String[] args) throws Exception {           Test17 t = new Test17();           t.count1();           t.count2();       }   //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理       public void count1() throws Exception{           ExecutorService exec = Executors.newCachedThreadPool();           BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();           for(int i=0; i<10; i++){               Future<Integer> future =exec.submit(getTask());               queue.add(future);           }           int sum = 0;           int queueSize = queue.size();           for(int i=0; i<queueSize; i++){               sum += queue.take().get();           }           System.out.println("总数为:"+sum);           exec.shutdown();       }   //使用CompletionService(完成服务)保持Executor处理的结果       public void count2() throws InterruptedException, ExecutionException{           ExecutorService exec = Executors.newCachedThreadPool();           CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);           for(int i=0; i<10; i++){               execcomp.submit(getTask());           }           int sum = 0;           for(int i=0; i<10; i++){   //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。               Future<Integer> future = execcomp.take();               sum += future.get();           }           System.out.println("总数为:"+sum);           exec.shutdown();       }       //得到一个任务       public Callable<Integer> getTask(){           final Random rand = new Random();           Callable<Integer> task = new Callable<Integer>(){               @Override               public Integer call() throws Exception {                   int i = rand.nextInt(10);                   int j = rand.nextInt(10);                   int sum = i*j;                   System.out.print(sum+"\t");                   return sum;               }           };           return task;       }   结果:

    490  6 48 28 312 0 0 6 总数为:152 56  8  5425  35 10 12  854 20  总数为:282

    相关资源:python入门教程(PDF版)
    最新回复(0)