在java中创建线程的方式肯定大家都知道,但是这样创建有什么问题呢?当然是有问题的要知道线程在创建时是极其耗费资源的,就比如你要乘坐公交汽车肯定是你也可以用别人也可以用的,不会是你用完或者别人用完就把车子送到废品回收站了,当然是回到公交车站,这个公交车站就好比线程池,公交车就好比一个个线程。不知道大家对这么描述是否理解,不理解也没关系下面有更详细的介绍。
打开idea我们找到ThreadPoolExecutor看到这个类如下
* <p>Thread pools address two different problems: they usually * provide improved performance when executing large numbers of * asynchronous tasks, due to reduced per-task invocation overhead, * and they provide a means of bounding and managing the resources, * including threads, consumed when executing a collection of tasks. * Each {@code ThreadPoolExecutor} also maintains some basic * statistics, such as the number of completed tasks. * 线程池处理两个不同的问题:它们通常在执行大量异步任务,由于减少了每个任务的调用开销,它们提供了一种界定和管理资源的方法,包括执行任务集合时使用的线程。每个线程池执行器也维护一些基本的统计信息,如已完成任务的数量。看到官方的描述是这样的 那么大家也对他有进一步的认识了吧。
以下是创建线程池的4个构造方法通过代码中我们知道上面的三个方法都是通过调用第四个方法执行的。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择: ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;handler:表示当拒绝处理任务时的策略,有以下四种取值: ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务我们看到ThreadPoolExecutor类是继承AbstractExecutorService那么我们就来看看AbstractExecutorService这个类
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { }; private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。 那么我们接着追看看ExecutorService里面写了什么
public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }巴拉巴拉又是一大堆方法是不是感觉很晕呢没关系看他最终继承Executor
public interface Executor { void execute(Runnable command); }到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
什么?你还不明白?没关系看图
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;然后ThreadPoolExecutor继承了类AbstractExecutorService。在ThreadPoolExecutor类中有几个非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
shutdown()和shutdownNow()是用来关闭线程池的。
还有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。
在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:
3.1线程池的状态
3.2任务的执行
3.3线程池中的线程初始化
3.4任务缓存队列以及任务排队策略
3.5任务拒绝策略
3.6线程池的关闭
3.7线程池容量的动态调整
以下是官方注释
* RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed RUNNING:接受新任务并处理排队的任务 SHUTDOWN:不接受新任务,但处理排队的任务 STOP:不接受新任务,不处理排队的任务,中断正在进行的任务 TIDYING:所有任务都已终止,WorkerCount为零,线程转换为状态整理将运行终止的()hook方法 TERMINATED:已终止()已完成当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
对应具体值但是注意他的状态是这样保证线程可见性的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始值:状态RUNNINT,工作线程数量:0
每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。 corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。
举个栗子:假设一个工厂有十台机器,每个机器每次都只能做一件事,那么现在如果没有任务那么机器都是闲置的,如果来了任务那么就任务分配给空闲的机器,如果机器都在工作那么就将多余的任务排队等待(缓存队列)如果新增任务的速度远远大于机器生产每个任务的速度,那么老板就会想补救措施,购置几台新机器假设先购置十台(线程池最大能容忍的线程数);如果新购置了机器还不能满足现有要求那么就不在接收新的任务了(拒绝策略)。如果后面任务变少了老板就会考虑把一部分新购置的机器卖掉毕竟闲置是要浪费钱的(线程池corePoolSize范围外)也就新购置的10台。
这个例子中的corePoolSize就是10,而maximumPoolSize就是20(10+10)。也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }看了下是不是没有注释感觉像喝了二斤二锅头一样迷糊了呢?没关系下面会一步一步分析代码的语义。
第一步判断当前线程是否为空,为空则抛出空指针
第二步判断当前线程数是否小于核心线程数corePoolSize,小于当线程总数小于corePoolSize时会将任务通过addWorker()直接调度。
第三步会在workQueue.offer()处进入等待队列。如果进入等待队列失败(如有界队列达到上限,或者使用了SynchronousQueue)则会执行下一个的addWorker()将任务直接提交给线程池。
第四步如果当前线程数已经达到了maximumPoolSize则提交失败执行拒绝策略reject()。
大概我总结了下流程图是这样的
默认情况下线程池在创建时是不会创建线程的,如果我非要创建呢?当然是有办法的,jdk就为我们提供了以下方法
初始化一个核心线程
public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); }初始化所有核心线程
public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }那么大家看到了这俩个方法都是调用了同一个方法addWorker ,该方法是用来创建,运行,清理Workers的。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }看到这个是不是感觉又晕了呢?没关系下面我们就来一步一步分析代码中的语义。
第一步:获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false,如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false。
第二步:通过自旋的方式,判断要添加的Worker是否是corePool,如果是的话,那么则判断当前的workerCount是否大于corePoolsize,否则则判断是否大于maximumPoolSize,如果满足的话,说明workerCount超出了线程池大小,直接返回false。如果小于的话,那么判断是否成功将WorkerCount通过CAS操作增加1,如果增加成功的话。则进行到第3步,否则则判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断。
第三步:如果满足了的话,那么则创建一个新的Worker对象,然后获取线程池的重入锁后,判断当前线程池的状态,如果当前线程池状态为STOP,TIDYING,TERMINATED的话,那么调用isAlive判断是否已启动存活线程,如果为已启动线程则抛出异常IllegalThreadStateException并执行addWorkerFailed方法
addWorkerFailed方法
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock;//获取可重入锁 mainLock.lock();//开启锁 try { if (w != null) workers.remove(w);//清空当前任务 decrementWorkerCount();//将workerCount减一 tryTerminate();//停止线程池 } finally { mainLock.unlock();//释放锁 } }第四步:如果状态满足的话,那么则在workers中将新创建的worker添加,并且重新计算largestPoolSize,然后启动Worker中的线程开始执行任务。
前面我们讲到当线程池超过corePoolSize时就会放入到缓存队列中那么下面我们就来说说任务的缓存队列。
1、ArrayBlockingQueue:基于数组的先进先出,创建时必须指定大小,超出直接corePoolSize个任务,则加入到该队列中,只能加该queue设置的大小,其余的任务则创建线程,直到(corePoolSize+新建线程)> maximumPoolSize。
2、LinkedBlockingQueue:基于链表的先进先出,无界队列。超出直接corePoolSize个任务,则加入到该队列中,直到资源耗尽。
3、synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
以下是官网提供的注释给大家解释下
* <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the * handler throws a runtime {@link RejectedExecutionException} upon * rejection. </li> * * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread * that invokes {@code execute} itself runs the task. This provides a * simple feedback control mechanism that will slow down the rate that * new tasks are submitted. </li> * * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that * cannot be executed is simply dropped. </li> * * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the * executor is not shut down, the task at the head of the work queue * is dropped, and then execution is retried (which can fail again, * causing this to be repeated.) </li>1、ThreadPoolExecutor.AbortPolicy:丢弃该任务并直接抛出异常RejectedExecutionException。
2、ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,并提供一种简单的反馈机制,可以有效防止新任务的提交。
3、ThreadPoolExecutor.DiscardPolicy:丢弃当前任务,但不会抛出异常。
4、ThreadPoolExecutor.DiscardOldestPolicy:如果任务器没有关闭,那么丢弃在队列在最前面的任务,然后重试执行(可能会再次失败,重复上述过程)。
线程池有开启当然也有关闭了,官网提供了两种关闭的方法
1、shutdown:提供一种有序的关机,会等待当前缓存队列任务全部执行完成才会关闭,但不会再接收新的任务(相对较优雅)。
2、shutdownNow:会立即关闭线程池,会打断正在执行的任务并且会清空缓存队列中的任务,返回的是尚未执行的任务。
如果我想动态调整线程池的大小呢?官网提供了对应的方法。
1、setCorePoolSize:顾名思义就是来设置corePoolSize大小的但是值得注意的是如果设置值小于当前值那么多余的线程将变的无所事事,如果当前设置值大于新的值,那么新的线程会去执行缓存队列中的任务。
2、setMaximumPoolSize:设置当前线程池的maximumPoolSize,设置的值如果小于旧值时超出的线程将在下一次空闲时终止。
以上均参考自JDK1.8