Java中ExecutorService和CompletionService区别

    xiaoxiao2022-07-13  137

    我们现在在Java中使用多线程通常不会直接用Thread对象了,而是会用到java.util.concurrent包下的ExecutorService类来初始化一个线程池供我们使用。

    之前我一直习惯自己维护一个list保存submit的callable task所返回的Future对象。

    在主线程中遍历这个list并调用Future的get()方法取到Task的返回值。

     

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    public class CompletionServiceTest {

     

        static class Task implements Callable<String>{

            private int i;

             

            public Task(int i){

                this.i = i;

            }

     

            @Override

            public String call() throws Exception {

                Thread.sleep(10000);

                return Thread.currentThread().getName() + "执行完任务:" + i;

            }  

        }

         

        public static void main(String[] args){

            testUseFuture();

        }

         

        private static void testUseFuture(){

            int numThread = 5;

            ExecutorService executor = Executors.newFixedThreadPool(numThread);

            List<Future<String>> futureList = new ArrayList<Future<String>>();

            for(int i = 0;i<numThread;i++ ){

                Future<String> future = executor.submit(new CompletionServiceTest.Task(i));

                futureList.add(future);

            }

                     

            while(numThread > 0){

                for(Future<String> future : futureList){

                    String result = null;

                    try {

                        result = future.get(0, TimeUnit.SECONDS);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    } catch (ExecutionException e) {

                        e.printStackTrace();

                    } catch (TimeoutException e) {

                        //超时异常直接忽略

                    }

                    if(null != result){

                        futureList.remove(future);

                        numThread--;

                        System.out.println(result);

                        //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)

                        break;

                    }

                }

            }

        }

    }

      

    但是,我在很多地方会看到一些代码通过CompletionService包装ExecutorService,然后调用其take()方法去取Future对象。

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    public class CompletionServiceTest {

     

        static class Task implements Callable<String>{

            private int i;

             

            public Task(int i){

                this.i = i;

            }

     

            @Override

            public String call() throws Exception {

                Thread.sleep(10000);

                return Thread.currentThread().getName() + "执行完任务:" + i;

            }  

        }

         

        public static void main(String[] args) throws InterruptedException, ExecutionException{

            testExecutorCompletionService();

        }

         

        private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{

            int numThread = 5;

            ExecutorService executor = Executors.newFixedThreadPool(numThread);

            CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

            for(int i = 0;i<numThread;i++ ){

                completionService.submit(new CompletionServiceTest.Task(i));

            }

    }

             

            for(int i = 0;i<numThread;i++ ){    

                System.out.println(completionService.take().get());

            }

             

        }

      

    以前没研究过这两者之间的区别。今天看了源代码之后就明白了。

     

    这两者最主要的区别在于submit的task不一定是按照加入自己维护的list顺序完成的。

    从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。

     

    而CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。

    所以,先完成的必定先被取出。这样就减少了不必要的等待时间

    https://www.cnblogs.com/E-star/p/4882154.html

    最新回复(0)