Java线程池简要分析

    xiaoxiao2022-06-25  217

    Java中可使用java.util.concurrent包中ThreadPoolExecutor作为线程池。JDK本身提供4种特定的线程池模板供我们使用,当然,我们可以按需创建自定义的ThreadPoolExecutor,但是大多数情况下,这四种已经可以满足需求。下面分别解释。

    基础概念

    BlockingQueue:线程池底层都包含一个BlockingQueue,这关系到每个线程池的特性,所以先来分析下,特性有三: 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行)如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

    源码如下

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        reject(command);

    keepAliveTime:当线程数大于corePoolSize时,此为终止前多余的空闲线程等待新任务的最长时间。RejectedExecutionHandler:当超过队列处理能力后的策略 CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionExceptionDiscardPolicy:不能执行的任务将被删除DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

    FixedThreadPool

    这个线程池是一个固定消费线程数的线程池,在初始化的时候,就可以定义其并发处理数。而若该线程池的消费线程均在工作中的时候,新的线程会被塞到一个LinkedBlockingQueue中进行排队

    构造函数

    FixedThreadPool

    1

    2

    3

    4

    5

    6

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>(),

                                      threadFactory);

    }

    LinkedBlockingQueue是线程安全的基于链表的队列,可以保证快速出队入队,和正常的GC(垃圾回收),下面贴一下dequeue的源码

    LinkedBlockingQueue-dequeue

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    /**

     * Removes a node from head of queue.

     * @return the node

     */ 

    private E dequeue() { 

        // assert takeLock.isHeldByCurrentThread(); 

        Node<E> h = head; 

        Node<E> first = h.next; 

        h.next = h; // help GC 

        head = first; 

        E x = first.item; 

        first.item = null; 

        return x; 

    }

    可以看到,代码中特意针对GC进行了处理,所以会被迅速垃圾回收

    LinkedBlockingQueue是继承BlockingQueue的,对于线程池这个场景,有ArrayBlockingQueue和LinkedBlockingQueue两种适合使用,如果我们要自定义线程池,可以依据它们的特点进行选用: LinkedBlockingQueue:基于链表,无capacity(即队列长度),put和get分别有一把锁ArrayBlockingQueue:基于数组,有capacity,put和get共用一把锁分析:LinkedBlockingQueue因为没有capacity,那么当并发很高或者处理很慢的时候,队列会无限扩大,最终导致队列占用内存过大引发的OOM(内存溢出)。但是因为其put和get是两把锁,故存取的性能会相当高。而ArrayBlockingQueue基于数组,在初始化的时候就分配了固定内存,后面不可以再增加队列的长度,可以保证不会因队列无限长而导致的OOM,但是也意味着会限制线程池的处理能力,因此需要设定队列满所需要的策略(参考基础概念中的介绍),当业务量大的时候,可以考虑扩点

    CachedThreadPool

    这个线程池特性是没有固定大小,从0到Integer.MAX_VALUE都可以,每当一个线程进入线程池,会判断是否有空闲的worker待使用,若有,则使用这个worker消费线程,若没有,则创建一个worker进行消费。底层队列是一个特殊的BlockingQueue:SynchronousQueue,它没有缓冲,每个put必须等待一个take动作,双方在这个Queue中实现握手,官方解释:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁该线程池适用于大量的处理快速的小线程,因为它本质上是没有对并发线程数进行控制的,那么当请求积压的时候,线程数量会急速上升,有可能会达到OS系统的线程上限限制抛出OutOfMemoryError,或者导致OOM。相比于直接去new一个Thread,这个线程池可以保留空闲线程60秒,尽可能的避免了过多的回收创建线程的损耗,这是它的优势。

     

    ScheduledThreadPool

    其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor 才会真正启动一个线程,其余时间 ScheduledExecutor 都是在轮询任务的状态。

    SingleThread

    SingleThreadExecutor就像线程数为1的FixedThreadPool。如果向SingleThreadExecutor提交多个任务,这些任务将排队。从输出结果可以看到,任务按照提交顺序被执行。SingleThreadExecutor可以确保任何线程中都只有唯一的任务在运行。(多个线程使用同一文件系统时,可以用SingleThreadExecutor来保持同步)

    总结

    按我们组目前的应用场景,只需要在FixedThreadPool和CachedThreadPool二者之间做出选择。相比于后者而言,FixedThreadPool不用反复的创建销毁工作线程(虽然CachedThreadPool做了一定的控制),同时因为已限定并发数量,所以FixedThreadPool并不会使用太多系统资源。在大量并发且处理速度较慢的时候,FixedThreadPool会将这些线程入队,而CachedThreadPool会创建工作线程并处理每个线程的实际操作,因为二者都没有控制,所以相比较而言无疑前者优势更大。需要注意的是,FixedThreadPool底层的LinkedBlockingQueue没有长度限制,虽然在使用时是比较方便,但在较极端的情况下队列过大也会出现OOM的情况,这种情况只能通过代码逻辑降低处理时效、或限制并发、或物理扩点以变相提高负载能力。若考虑使用一些策略的情况下(如基础概念中所述),可以适当降低处理效率,自定义ThreadPoolExecutor,底层BlockingQueue选用ArrayBlockingQueue。

    最新回复(0)