SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor

首页 / 新闻资讯 / 正文

一般的,我们提及线程池,最先想到的自然是java中的Executors,它作为一个线程池的工具类,提供给我们几种常见的线程池。稍微深入的童鞋应该也知道,它的底层实现,是使用了java中的ThreadPoolExecutor,而且一般在面试的时候,面试官问到的也会是这个ThreadPoolExecutor。

我的这篇文章说的是在SpringBoot中使用线程池,自然也和它有关(底层代码实现也是它)。因此先来讲讲ThreadPoolExecutor的基本特点吧!

首先我们看看它的继承关系:
SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor

构造器

这里是它的参数最多的一个构造器:

public ThreadPoolExecutor(int corePoolSize,                               int maximumPoolSize,                               long keepAliveTime,                               TimeUnit unit,                               BlockingQueue<Runnable> workQueue,                               ThreadFactory threadFactory,                               RejectedExecutionHandler handler) 

几个参数的含义分别是

  • corePoolSize:要保留在池中的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:池中允许的最大线程数;
  • keepAliveTime:当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间;
  • unitkeepAliveTime参数的时间单位;
  • workQueue:用于在执行任务之前保存任务的队列。 这个队列将只保存execute方法提交的Runnable任务;
  • threadFactory:执行程序创建新线程时使用的工厂;
  • handler:执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量;

其中,ThreadFactory 的默认实现是:Executors.defaultThreadFactory()

RejectedExecutionHandler的默认实现是AbortPolicy,内部使用的是抛出异常的方式。当然,在企业中,为了不丢失任务,CallerRunsPolicy用的也是很多的。
CallerRunsPolicy的功能是:被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下任务将被丢弃。

四种拒绝策略

ThreadPoolExecutor.AbortPolicy

处理程序在拒绝时抛出运行时RejectedExecutionException

ThreadPoolExecutor.CallerRunsPolicy

调用execute自身的线程运行任务。 这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度。

ThreadPoolExecutor.DiscardPolicy

无法执行的任务被简单地丢弃。

ThreadPoolExecutor.DiscardOldestPolicy

如果执行器没有关闭,工作队列头部的任务会被丢弃,然后重试执行(可能会再次失败,导致重复执行)。

工作流程

当一个新的任务提交给线程池时,线程池的处理步骤

  1. 首先判断核心线程数是否已满,如果没满,则调用一个线程处理Task任务,如果已满,则执行步骤(2)。

  2. 这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步骤(3)。

  3. 判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤(4)。

  4. 这时会使用淘汰策略来处理无法执行的Task任务。

那现在进入正题,ThreadPoolTaskExecutor 这个类是Spring-Context支持的一个,专门用于Spring环境的线程池。其底层是在ThreadPoolExecutor的基础上包装一层,使得与Spring的整合更加方便。

继承关系

这是根据Idea生成的一个继承关系:
SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor

成员变/常量

内部的成员变量有:
SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor
可以看到,确实依赖的是ThreadPoolExecutor。

初始化方法

其中有一个初始化用的方法:

public void initialize() { 	if (logger.isDebugEnabled()) { 		logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); 	} 	if (!this.threadNamePrefixSet && this.beanName != null) { 		setThreadNamePrefix(this.beanName + "-"); 	} 	this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }  

可以看到这里调用的是initializeExecutor(this.threadFactory, this.rejectedExecutionHandler),那么我们再来看看这个方法做了什么?其实是初始化了一个ThreadPoolExecutor

protected ExecutorService initializeExecutor( 			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {  		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);  		ThreadPoolExecutor executor; 		if (this.taskDecorator != null) { 			executor = new ThreadPoolExecutor( 					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 					queue, threadFactory, rejectedExecutionHandler) { 				@Override 				public void execute(Runnable command) { 					Runnable decorated = taskDecorator.decorate(command); 					if (decorated != command) { 						decoratedTaskMap.put(decorated, command); 					} 					super.execute(decorated); 				} 			}; 		} 		else { 			executor = new ThreadPoolExecutor( 					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 					queue, threadFactory, rejectedExecutionHandler);  		}  		if (this.allowCoreThreadTimeOut) { 			executor.allowCoreThreadTimeOut(true); 		}  		this.threadPoolExecutor = executor; 		return executor; 	}  

所以,它俩的关系,你明白了吗?就是这么暧昧!

首先确定你的java版本是1.8及其以上!

创建SpringBoot项目

然后创建一个最简单的SpringBoot项目:
依赖只需要:

<dependencies>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-web</artifactId>         </dependency>          <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-test</artifactId>             <scope>test</scope>         </dependency>          <dependency>             <groupId>org.projectlombok</groupId>             <artifactId>lombok</artifactId>             <version>1.18.20</version>         </dependency>     </dependencies>  

文件列表

启动类并未使用,直接用测试类测试即可达到效果!
SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor

配置属性文件

 thread-pool.config.corePoolSize = 10 thread-pool.config.maxPoolSize = 100 thread-pool.config.queueCapacity = 200 thread-pool.config.threadNamePrefix = MyThread- #  CallerRunsPolicy thread-pool.config.rejectedExecutionHandler=  

配置类

package org.feng.config;  import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor;  /**  * SpringBoot 装配线程池  * @author FengJinSong  */ @Slf4j @EnableAsync @Configuration public class ThreadPoolConfig implements AsyncConfigurer {      private static final String EXECUTOR_NAME = "asyncExecutor";      @Value("${thread-pool.config.corePoolSize:10}")     private Integer corePoolSize;     @Value("${thread-pool.config.maxPoolSize:100}")     private Integer maxPoolSize;     @Value("${thread-pool.config.queueCapacity:200}")     private Integer queueCapacity;     @Value("${thread-pool.config.threadNamePrefix:AsyncThread-}")     private String threadNamePrefix;     @Value("${thread-pool.config.rejectedExecutionHandler:CallerRunsPolicy}")     private String rejectedExecutionHandler;      @Bean(name = EXECUTOR_NAME)     @Override     public Executor getAsyncExecutor() {         ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();         // 核心线程数         threadPoolTaskExecutor.setCorePoolSize(corePoolSize);         // 最大线程数         threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);         // 阻塞队列容量         threadPoolTaskExecutor.setQueueCapacity(queueCapacity);         // 待任务在关机时完成--表明等待所有线程执行完         threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);         // 线程名称前缀         threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);         // 设置拒绝策略         threadPoolTaskExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler));          threadPoolTaskExecutor.initialize();         return threadPoolTaskExecutor;     }       @Override     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {         return (throwable, method, obj) -> {             log.error("[ThreadPool Exception]:Message [{}], Method [{}]", throwable.getMessage(), method.getName());             for (Object param : obj) {                 log.error("Parameter value [{}] ", param);             }         };     }      /**      * 根据传入的参数获取拒绝策略      * @param rejectedName 拒绝策略名,比如 CallerRunsPolicy      * @return RejectedExecutionHandler 实例对象,没有匹配的策略时,默认取 CallerRunsPolicy 实例      */     public RejectedExecutionHandler getRejectedExecutionHandler(String rejectedName){         Map<String, RejectedExecutionHandler> rejectedExecutionHandlerMap = new HashMap<>(16);         rejectedExecutionHandlerMap.put("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy());         rejectedExecutionHandlerMap.put("AbortPolicy", new ThreadPoolExecutor.AbortPolicy());         rejectedExecutionHandlerMap.put("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy());         rejectedExecutionHandlerMap.put("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy());         return rejectedExecutionHandlerMap.getOrDefault(rejectedName, new ThreadPoolExecutor.CallerRunsPolicy());     } }   

测试

package org.feng;  import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  import javax.annotation.Resource; import java.util.concurrent.CompletableFuture;  @SpringBootTest class SpringbootDemoApplicationTests {      @Resource     ThreadPoolTaskExecutor asyncExecutor;      @Test     void contextLoads() {         // 控制台输出:MyThread-1-666         CompletableFuture.runAsync(() -> {             System.out.println(String.join("-", Thread.currentThread().getName(), "666"));         }, asyncExecutor);     } } 

关于这些东西,都不能仅限制在能看懂代码,要尝试思考,灵活运用,及其核心的思想是如何的。
最后,各位,如果本文对你多少有点 学到了 的样子,烦请高抬贵手点个赞吧,求求了。