在实际的编程中,我们通常借助Executor框架实现异步任务。 通常用工具类Executors工具类来帮助我们来创建线程池。
参数释义:
corePoolSize:线程池的核心线程池大小maximumPoolSize:线程池最大大小keepAliveTime:设置超时时间unit:时间单位workQueue:保存等待执行的任务threadFactory:线程工厂,可以省略,省略的话,那就使用默认的newFixedThreadPool( )方法代码如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // newFixedThreadPool( )方法的重载,添加threadFactory参数 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }基于源码分析可知:
线程池的核心线程池大小等于其最大线程池大小工作线程永不超时达到最大线程池的最大数量,线程池的规模将不再变化补充:如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程;当我们不再需要该线程池时主动将其关闭。
newCachedThreadPool( )方法代码如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // newCachedThreadPool( )方法的重载,添加threadFactory参数 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory){ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }基于源码分析可知:
线程池的核心线程池大小设置为0线程池的最大大小设置为Integer.MAX_VALUE工作线程超时为60s补充:这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。
newSingleThreadExecutor( )方法代码如下:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // newSingleThreadExecutor( )方法的重载,添加threadFactory参数 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }基于源码分析可知:
是一个单线程的Executor工作线程超时为0补充:它创建单个工作线程来执行任务,如果这个任务异常结束,会创建一个线程来替代。newSingleThreadExecutor( )能确保依照任务在队列中的顺序来串行执行。
newScheduledThreadPool( )方法代码如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){ return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }基于源码分析可知:
创建了固定长度的线程池工作线程超时为0使用示例:
import java.text.DateFormat; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class TestNewScheduledThreadPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(2); // 在任务执行时间小于间隔时间的情况下,程序以起始时间为准则,每隔指定时间执行一次,不受任务执行时间影响。 scheduleAtFixedRate(service, 1000); // 当执行任务时间大于间隔时间,此方法不会重新开启一个新的任务进行执行,而是等待原有任务执行完成,马上开启下一个任务进行执行。此时,执行间隔时间已经被打乱 scheduleAtFixedRate(service, 6000); // 此方法无论任务执行时间长短,都是当第一个任务执行完成之后,延迟指定时间再开始执行第二个任务 scheduleWithFixedDelay(service, 1000); scheduleWithFixedDelay(service, 6000); } private static void scheduleAtFixedRate(ScheduledExecutorService service, final int sleepTime) { service.scheduleAtFixedRate(new Runnable() { @Override public void run() { long start = new Date().getTime(); System.out.println("scheduleAtFixedRate 开始执行时间:" + DateFormat.getTimeInstance().format(new Date())); try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } long end = new Date().getTime(); System.out.println("scheduleAtFixedRate 执行花费时间=" + (end - start) / 1000 + "m"); System.out.println("scheduleAtFixedRate 执行完成时间:" + DateFormat.getTimeInstance().format(new Date())); System.out.println("======================================"); } }, 1000, 5000, TimeUnit.MILLISECONDS); } private static void scheduleWithFixedDelay(ScheduledExecutorService service, final int sleepTime) { service.scheduleWithFixedDelay(new Runnable() { @Override public void run() { long start = new Date().getTime(); System.out.println("scheduleWithFixedDelay 开始执行时间:" + DateFormat.getTimeInstance().format(new Date())); try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } long end = new Date().getTime(); System.out.println("scheduleWithFixedDelay执行花费时间=" + (end - start) / 1000 + "m"); System.out.println("scheduleWithFixedDelay执行完成时间:" + DateFormat.getTimeInstance().format(new Date())); System.out.println("======================================"); } }, 1000, 5000, TimeUnit.MILLISECONDS); } }代码来源link。
补充: Timer存在一些缺陷,因此应该考虑使用ScheduledThreadPoolExecutor来替代它。
缺陷如下:
Timer在执行所有定时任务时只会创建一个线程。如果TimerTask抛出了一个未检查的异常时,将终止定时线程。Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。ExecutorService的生命周期有三种状态:运行、关闭和已终止。 ExecutorService在初始创建时处于运行状态。 shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。 shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。将会返回尚未启动的任务清单。 所有任务完成后,ExecutorService将转入终止状态。可以调用awaitTermination( )来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。
可以看到call( )方法返回一个值或者抛出异常。如果我们起了一个线程,需要得到返回值,但是这个返回值什么时候返回并不知道,因为是异步的,而且是得到返回值还是捕获了异常也不清楚。这些都需要进行进一步的封装,也就是Future接口。个人认为,Future接口的存在是为了封装对任务执行情况的获取。 Future的get( )是阻塞的。
当向Executor提交多个任务并且希望获得它们在完成之后的结果,如果用FutureTask,可以循环获取task,并调用get方法去获取task执行结果,但是如果task还未完成,获取结果的线程将阻塞直到task完成,由于不知道哪个task优先执行完毕,使用这种方式效率不会很高。在jdk5时候提出接口CompletionService,它整合了Executor和BlockingQueue的功能,可以更加方便在多个任务执行时获取到任务执行结果。
public interface CompletionService<V> { // 提交Callable类型的task Future<V> submit(Callable<V> task); // 提交Runnable类型的task Future<V> submit(Runnable task, V result); // 获取并移除已完成状态的task,如果目前不存在这样的task,则等待 Future<V> take() throws InterruptedException; // 获取并移除已完成状态的task,如果目前不存在这样的task,返回null; Future<V> poll(); // 获取并移除已完成状态的task,如果在指定等待时间内不存在这样的task,返回null Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }示例:
public class CompletionServiceTest { private static ExecutorService executor = Executors.newFixedThreadPool(100); public void run() { CompletionService<Long> completionService = new ExecutorCompletionService<>(executor); final int groupNum = 100000000 / 100; for (int i = 1; i <= 100; i++) { int start = (i - 1) * groupNum + 1, end = i * groupNum; completionService.submit(new Callable<Long>() { @Override public Long call() throws Exception { Long sum = 0L; for (int j = start; j <= end; j++) { sum += j; } return sum; } }); } long result = 0L; try { for (int i = 0; i < 100; i++) { // completionService.take()得到是一个Future result += completionService.take().get(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("the result is " + result); } public static void main(String[] args) { CompletionServiceTest completionServiceTest = new CompletionServiceTest(); completionServiceTest.run(); } }参考链接:Link。
四种饱和策略:
CallerRunsPolicy:调用者运行策略AbortPolicy:终止策略是默认的饱和策略DiscardPolicy:抛弃策略DiscardOldestPolicy:抛弃最旧的策略抛弃最旧的策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。不要将此策略和优先级队列放在一起使用,因为将导致抛弃优先级最高的任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }抛弃策略会悄悄地抛弃该任务。
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }调用者运行策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }