Java并发编程(二十一):Java自定义线程池ThreadPoolExecutor
Java自定义线程池ThreadPoolExecutor1 概述2 Executor框架体系概述3 ThreadPoolExecutor详解3.1 ThreadPoolExecutor使用示例4 线程池的执行流程5 拒绝策略RejectedExecutionHandler6 总结大家好我是欧阳方超公众号同名。1 概述线程池Thread Pool是Java并发编程的基石它通过复用线程来避免频繁创建和销毁线程带来的开销上下文切换等从而提高系统性能、控制资源使用并提供任务队列和拒绝机制来处理高并发场景。Java的线程池主要基于Executor框架实现核心类是ThreadPoolExecutor。2 Executor框架体系概述Executor顶级接口只有一个方法execute(Runnable command)负责执行任务但不提供管理功能。ExecutorService继承Executor扩展了更多功能如submit()支持返回结果的Future、shutdown()、shutdownNow()等生命周期管理方法。AbstractExecutorServiceExecutorService的抽象实现提供了一些模板方法。TreadPoolExecutorExecutorService的核心实现类大多数自定义线程池都基于它。ScheduledThreadPoolExecutor支持定时/周期性任务的线程池继承自ThreadPoolExecutor。Executors工具类提供了工厂方法快速创建常见线程池如newFixedThreadPool、newCachedThreadPool等。注意生产环境中不推荐直接使用Executors的工厂方法创建线程池因为默认参数可能导致OOM例如无界队列或无限线程而是手动通过ThreadPoolExecutor构造函数自定义参数。3 ThreadPoolExecutor详解ThreadPoolExecutor参数最完整的构造方法如下public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //非核心线程空闲存活时间 TimeUnit unit, //时间单位 BlockingQueueRunnableworkQueue, //任务队列 ThreadFactory threadFactory, //现成工厂可选用于自定义线程名称等 RejectedExecutionHandler handler //拒绝策略可选)参数详细解释corePoolSize核心线程数线程池中一直保持的线程数量即使空闲也不会被销毁除非设置allowCoreThreadTimeOut(true)。新任务到来时如果当前线程数小于corePoolSize会直接创建新线程执行任务。建议根据CPU核心数或业务并发度设置例如CPU密集型corePoolSize≈ CPU核数IO密集型corePoolSize≈CPU核数*2。maximumPoolSize最大线程数线程池允许创建的最大线程数量。当核心线程都在忙、队列也满时如果当前线程数小于maximumPoolSize会创建非核心线程。通常设置为corePoolSize的1.5~2倍。keepAlivetime、unit空闲存活时间当线程数大于corePoolSize时多余的非核心线程空闲超过这个时间后会被回收。单位由TimeUnit指定如TimeUnit.SECONDS。帮助释放多余资源避免线程过多占用内存/CPU。workQueue任务队列BlockingQueue用于缓存等待执行的任务。常用实现ArrayBlockingQueue有界数组队列FIFO性能好但容量固定。LinkedBlockingQueue链表实现默认无界容易导致OOMExecutors.newFixedThreadPool默认使用它。SynchronousQueue零容量队列不存储任务直接交给线程执行Executors.newCachedThreadPool默认使用适合大量短任务。PriorityBlockingQueue优先级队列。threadFactory线程工厂用于创建新线程默认是Executors.defaultThreadFactory()。常用来自定义线程名称如加上业务前缀便于日志排查。handler拒绝策略RejectedExceptionHandler当队列满且线程数达到maximumPoolSize时新任务如何处理。JDK内置了4种策略。3.1 ThreadPoolExecutor使用示例下面通过一个示例演示ThreadPoolExecutor的使用package com.futuretech.userservice.config;importjava.time.LocalDateTime;importjava.time.format.DateTimeFormatter;importjava.util.concurrent.*;importjava.util.concurrent.atomic.AtomicInteger;/** * 生产环境级ThreadPoolExecutor最佳实践演示 * 演示内容 * - 手动创建 ThreadPoolExecutor推荐方式 * -7大核心参数的合理配置 * - 有界队列避免OOM * - 自定义线程工厂给线程起有意义的名字 * - 拒绝策略CallerRunsPolicy * - 任务提交Runnable 和 Callable * - 包含业务隔离、自定义线程工厂、全局异常捕获、优雅关闭线程池 */ public class ThreadPoolExecutorDemo{public static void main(String[]args){System.out.println( 系统启动初始化业务线程池 \n);//1.核心业务线程池用于处理核心订单逻辑CPU密集型居多快速响应 //参数考量核心线程数与CPU核心数挂钩队列较小防止任务积压导致C // 端长时间等待采用AbortPolicy快速失败降级 ThreadPoolExecutor orderPoolnew ThreadPoolExecutor(2,4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new NamedThreadFactory(Order-Core-Pool,false), //自定义命名与异常捕获 new ThreadPoolExecutor.AbortPolicy()//handler队列满时抛出RejectedExecutionException);//2.非核心业务线程池用于处理短信/邮件发送I/O密集型允许一定延迟隔离核心业务 //参数考量核心数适当调大队列适当调大采用CallerRunsPolicy降级慢点发没关系别丢了就行 ThreadPoolExecutor noticePoolnew ThreadPoolExecutor(4,8, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue(500), new NamedThreadFactory(Sms-Notice-Pool,false), new ThreadPoolExecutor.CallerRunsPolicy()//handler队列满时交由提交任务的线程比如Tomcat去执行);// 模拟业务请求 //场景A模拟正常的核心订单处理使用execute提交Runnablefor(int i1;i3;i){final int orderIdi;orderPool.execute(()-{ log(正在处理订单业务订单号orderId);sleep(1000);//模拟业务耗时 });}//场景B模拟非核心业务异常抛出演示 ThreadFactory 中的 UncaughtExceptionHandler 机制//注意execute提交的任务抛出未捕获异常时会被异常处理器捕捉到 noticePool.execute(()-{ log(正在处理短信发送任务... 准备触发异常);throw new RuntimeException(第三方短信网关连接超时);});//场景C模拟核心业务有返回值的提交演示submit提交Callable及异常处理机制//注意submit提交的任务异常会被包装进Future中UncaughtExceptionHandler抓不到 FutureStringfutureResultorderPool.submit(()-{ log(正在执行带返回值的风控校验任务...);if(true){ throw new IllegalStateException(风控校验未通过发现违规参数);} return 校验通过;});//尝试获取submit任务的结果 try { String resultfutureResult.get();log(风控结果result);} catch(InterruptedException e){ Thread.currentThread().interrupt();} catch(ExecutionException e){ log([业务层捕获] submit任务抛出了异常异常原因被 Future 包装了:e.getCause().getMessage());}//系统关闭优雅停机 sleep(500);//稍微等待一下任务执行 System.out.println(\n 收到停机指令开始优雅关闭线程池 );gracefulShutdown(orderPool,Order-Core-Pool);gracefulShutdown(noticePool,Sms-Notice-Pool);}/** * 生产级自定义线程工厂 */ static class NamedThreadFactory implements ThreadFactory{private final AtomicInteger threadNumbernew AtomicInteger(1);private final String namePrefix;private final boolean daemon;public NamedThreadFactory(String namePrefix, boolean daemon){this.namePrefixnamePrefix -thread-;this.daemondaemon;}Override public Thread newThread(Runnable r){Thread tnew Thread(r, namePrefix threadNumber.getAndIncrement());t.setDaemon(daemon);t.setUncaughtExceptionHandler((thread,e)-{//真实项目中通常对接log.error并触发企微/邮件告警 System.out.printf([严重告警] 线程 [%s] 发生未捕获异常退出,异常类型:%s,异常信息:%s%n,thread.getName(),e.getClass().getSimpleName(),e.getMessage());});returnt;}}/** * 生产级线程池优雅停机标准模板 */ private static void gracefulShutdown(ThreadPoolExecutor pool, String poolName){pool.shutdown();// 拒绝新任务但继续执行队列中的任务 try{log(等待线程池 [ poolName ] 中剩余任务执行完毕...);// 给业务留出一段缓冲时间处理余下任务(例如60秒)if(!pool.awaitTermination(5, TimeUnit.SECONDS)){log(线程池 [ poolName ] 关闭超时强制终止运行中的任务);pool.shutdownNow();// 强制打断正在执行的线程}else{log(线程池[ poolName ] 已优雅关闭完毕。);}}catch(InterruptedException ie){pool.shutdownNow();Thread.currentThread().interrupt();// 恢复中断状态}}// 辅助工具方法带时间戳和线程名的日志输出 private static void log(String msg){StringtimeLocalDateTime.now().format(DateTimeFormatter.ofPattern(HH:mm:ss.SSS));String threadNameThread.currentThread().getName();System.out.printf([%s] [%s] - %s%n, time, threadName, msg);}// 辅助工具方法线程休眠 private static void sleep(long millis){try{Thread.sleep(millis);}catch(InterruptedException e){Thread.currentThread().interrupt();}}}对上面的代码从四个角度进行解读业务物理隔离Bulkhead舱壁模式代码中明确分离了orderPool和noticePool。这样做的目的是如果短信网关崩溃导致noticePool线程被全部耗尽orderPool依然能平稳处理核心订单绝不会导致整个系统雪崩。不同的拒绝策略对应不同业务核心系统AbortPolicy当流量剧增打满订单队列时直接抛出异常。当在网关或控制层捕获后立刻返回前端“系统繁忙”这是一种主动保护与降级。非核心系统CallerRunsPolicy发短信不那么要紧如果池子满了就让调用者比如Tomcat的Worker线程自己去发。这样不仅不丢任务还能变相对上游任务提交者反向施压反压backpressure即让上游任务提交者自己承担一部分任务的执行从而间接降低任务提交速度形成自动限流。execute()与submit()的异常处理差异场景Bexecute抛出异常会导致该线程终止随后触发ThreadFactory中配置的兜底告警机制。场景Csubmit底层是把任务包装成了FutureTask它会吞掉所有异常。即使ThreadFactory中配置了UncaughtExceptioinHandler也是抓不到的必须要在代码逻辑里显式地通过future.get()并catch ExecutionException才能拿到异常信息。规范的优雅关闭线程池Graceful Shutdown上面示例中的gracefulShutdown方法是标准的实现套路先调用shutdown()阻止新任务进场在通过awaitTermination()给老任务流出执行时间。如果超时还没跑完再调用shutdownNow()强制中断。4 线程池的执行流程提交任务execute(Runnable)或submit()时的处理步骤如果当前线程数小于corePoolSize则直接创建新线程执行任务如果当前线程数大于等于corePoolSize且队列未满则任务入队列等待不创建线程如果当前线程数大于等于corePoolSize队列已满但当前线程数小于maximumPoolSize则会创建新的线程执行任务如果当前线程数大于等于maximumPoolSize且队列已满则执行拒绝策略。5 拒绝策略RejectedExecutionHandler当任务无法被接受时触发拒绝策略AbortPolicy默认直接抛出RejectedExecutionExceptioin。适合需要快速感知过载的场景。CallerRunsPolicy用调用者线程提交任务的线程直接执行该任务。实现那“反压”调用者被阻塞无法继续提交适合允许短暂阻塞的场景。DiscardPolicy默默丢弃该任务不抛异常。适合可丢失任务的日志/监控场景。DiscardOldestPolicy丢弃队列中最旧的任务然后重新尝试提交当前任务。适合任务有新鲜度要求的场景如最新数据优先。6 总结根据业务类型CPU/IO密集、QPS调整corePoolSize、maximumPoolSize的值工作队列要设置为有界队列否则会出现OOM通过ThreadFactory设置线程名称不只是为了装饰更便于日志和问题排查。我是欧阳方超把事情做好了自然就有兴趣了如果你喜欢我的文章欢迎点赞、转发、评论加关注。我们下次见。