源码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
keepAliveTime:当线程数大于corePoolSize时,此为终止前多余的空闲线程等待新任务的最长时间。RejectedExecutionHandler:当超过队列处理能力后的策略 CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionExceptionDiscardPolicy:不能执行的任务将被删除DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)构造函数
FixedThreadPool
1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
LinkedBlockingQueue是线程安全的基于链表的队列,可以保证快速出队入队,和正常的GC(垃圾回收),下面贴一下dequeue的源码
LinkedBlockingQueue-dequeue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Removes a node from head of queue.
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
可以看到,代码中特意针对GC进行了处理,所以会被迅速垃圾回收
LinkedBlockingQueue是继承BlockingQueue的,对于线程池这个场景,有ArrayBlockingQueue和LinkedBlockingQueue两种适合使用,如果我们要自定义线程池,可以依据它们的特点进行选用: LinkedBlockingQueue:基于链表,无capacity(即队列长度),put和get分别有一把锁ArrayBlockingQueue:基于数组,有capacity,put和get共用一把锁分析:LinkedBlockingQueue因为没有capacity,那么当并发很高或者处理很慢的时候,队列会无限扩大,最终导致队列占用内存过大引发的OOM(内存溢出)。但是因为其put和get是两把锁,故存取的性能会相当高。而ArrayBlockingQueue基于数组,在初始化的时候就分配了固定内存,后面不可以再增加队列的长度,可以保证不会因队列无限长而导致的OOM,但是也意味着会限制线程池的处理能力,因此需要设定队列满所需要的策略(参考基础概念中的介绍),当业务量大的时候,可以考虑扩点
其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor 才会真正启动一个线程,其余时间 ScheduledExecutor 都是在轮询任务的状态。