一般的,我们提及线程池,最先想到的自然是java中的Executors,它作为一个线程池的工具类,提供给我们几种常见的线程池。稍微深入的童鞋应该也知道,它的底层实现,是使用了java中的ThreadPoolExecutor,而且一般在面试的时候,面试官问到的也会是这个ThreadPoolExecutor。
我的这篇文章说的是在SpringBoot中使用线程池,自然也和它有关(底层代码实现也是它)。因此先来讲讲ThreadPoolExecutor的基本特点吧!
首先我们看看它的继承关系:
这里是它的参数最多的一个构造器:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
几个参数的含义分别是
corePoolSize
:要保留在池中的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
;maximumPoolSize
:池中允许的最大线程数;keepAliveTime
:当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间;unit
:keepAliveTime
参数的时间单位;workQueue
:用于在执行任务之前保存任务的队列。 这个队列将只保存execute
方法提交的Runnable
任务;threadFactory
:执行程序创建新线程时使用的工厂;handler
:执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量;其中,ThreadFactory
的默认实现是:Executors.defaultThreadFactory()
。
RejectedExecutionHandler
的默认实现是AbortPolicy
,内部使用的是抛出异常的方式。当然,在企业中,为了不丢失任务,CallerRunsPolicy
用的也是很多的。CallerRunsPolicy
的功能是:被拒绝任务的处理程序,它直接在execute
方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下任务将被丢弃。
处理程序在拒绝时抛出运行时RejectedExecutionException
。
调用execute自身的线程运行任务。 这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度。
无法执行的任务被简单地丢弃。
如果执行器没有关闭,工作队列头部的任务会被丢弃,然后重试执行(可能会再次失败,导致重复执行)。
当一个新的任务提交给线程池时,线程池的处理步骤:
首先判断核心线程数是否已满,如果没满,则调用一个线程处理Task任务,如果已满,则执行步骤(2)。
这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步骤(3)。
判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤(4)。
这时会使用淘汰策略来处理无法执行的Task任务。
那现在进入正题,ThreadPoolTaskExecutor 这个类是Spring-Context支持的一个,专门用于Spring环境的线程池。其底层是在ThreadPoolExecutor的基础上包装一层,使得与Spring的整合更加方便。
这是根据Idea生成的一个继承关系:
内部的成员变量有:
可以看到,确实依赖的是ThreadPoolExecutor。
其中有一个初始化用的方法:
public void initialize() { if (logger.isDebugEnabled()) { logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }
可以看到这里调用的是initializeExecutor(this.threadFactory, this.rejectedExecutionHandler)
,那么我们再来看看这个方法做了什么?其实是初始化了一个ThreadPoolExecutor
!
protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
所以,它俩的关系,你明白了吗?就是这么暧昧!
首先确定你的java版本是1.8及其以上!
然后创建一个最简单的SpringBoot项目:
依赖只需要:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> </dependencies>
启动类并未使用,直接用测试类测试即可达到效果!
thread-pool.config.corePoolSize = 10 thread-pool.config.maxPoolSize = 100 thread-pool.config.queueCapacity = 200 thread-pool.config.threadNamePrefix = MyThread- # CallerRunsPolicy thread-pool.config.rejectedExecutionHandler=
package org.feng.config; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * SpringBoot 装配线程池 * @author FengJinSong */ @Slf4j @EnableAsync @Configuration public class ThreadPoolConfig implements AsyncConfigurer { private static final String EXECUTOR_NAME = "asyncExecutor"; @Value("${thread-pool.config.corePoolSize:10}") private Integer corePoolSize; @Value("${thread-pool.config.maxPoolSize:100}") private Integer maxPoolSize; @Value("${thread-pool.config.queueCapacity:200}") private Integer queueCapacity; @Value("${thread-pool.config.threadNamePrefix:AsyncThread-}") private String threadNamePrefix; @Value("${thread-pool.config.rejectedExecutionHandler:CallerRunsPolicy}") private String rejectedExecutionHandler; @Bean(name = EXECUTOR_NAME) @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); // 核心线程数 threadPoolTaskExecutor.setCorePoolSize(corePoolSize); // 最大线程数 threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize); // 阻塞队列容量 threadPoolTaskExecutor.setQueueCapacity(queueCapacity); // 待任务在关机时完成--表明等待所有线程执行完 threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 线程名称前缀 threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix); // 设置拒绝策略 threadPoolTaskExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler)); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (throwable, method, obj) -> { log.error("[ThreadPool Exception]:Message [{}], Method [{}]", throwable.getMessage(), method.getName()); for (Object param : obj) { log.error("Parameter value [{}] ", param); } }; } /** * 根据传入的参数获取拒绝策略 * @param rejectedName 拒绝策略名,比如 CallerRunsPolicy * @return RejectedExecutionHandler 实例对象,没有匹配的策略时,默认取 CallerRunsPolicy 实例 */ public RejectedExecutionHandler getRejectedExecutionHandler(String rejectedName){ Map<String, RejectedExecutionHandler> rejectedExecutionHandlerMap = new HashMap<>(16); rejectedExecutionHandlerMap.put("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy()); rejectedExecutionHandlerMap.put("AbortPolicy", new ThreadPoolExecutor.AbortPolicy()); rejectedExecutionHandlerMap.put("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy()); rejectedExecutionHandlerMap.put("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy()); return rejectedExecutionHandlerMap.getOrDefault(rejectedName, new ThreadPoolExecutor.CallerRunsPolicy()); } }
package org.feng; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.annotation.Resource; import java.util.concurrent.CompletableFuture; @SpringBootTest class SpringbootDemoApplicationTests { @Resource ThreadPoolTaskExecutor asyncExecutor; @Test void contextLoads() { // 控制台输出:MyThread-1-666 CompletableFuture.runAsync(() -> { System.out.println(String.join("-", Thread.currentThread().getName(), "666")); }, asyncExecutor); } }
关于这些东西,都不能仅限制在能看懂代码,要尝试思考,灵活运用,及其核心的思想是如何的。
最后,各位,如果本文对你多少有点 学到了 的样子,烦请高抬贵手点个赞吧,求求了。