异步处理器Service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.stereotype.Service; import java.util.concurrent.*;
@Slf4j @Service public class AsyncHandlerService {
public final static int THREAD_MAX_NUM = 60;
private final ExecutorService executorService = new ThreadPoolExecutor( THREAD_MAX_NUM / 2, THREAD_MAX_NUM, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10240), new BasicThreadFactory.Builder().namingPattern("AsyncHandlerService-thread-%d").daemon(true).build() );
public void submit(Runnable runnable) { Callable<Integer> callable = () -> { runnable.run(); return 1; }; Future<?> future = executorService.submit(callable); executorService.submit(() -> { try { future.get(); } catch (Throwable throwable) { log.error("执行异步任务出错{}", ExceptionUtils.getRootCauseMessage(throwable), throwable); } }); } }
|
ThreadPoolExecutor
核心参数详解: https://juejin.cn/post/6987576686472593415
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Configuration @Slf4j public class ThreadPoolConfig {
@Bean(value = "executorService") public ThreadPoolExecutor threadPoolExecutor() { int cpus = Runtime.getRuntime().availableProcessors(); return new ThreadPoolExecutor( cpus * 5, cpus * 10, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new CustomThreadFactory(), new CommAbortPolicy() ); }
static class CommAbortPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { String message = executor.toString(); log.error("线程池已满,无法继续处理任务:{}", message); } }
static class CustomThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { int hashCode = r.hashCode(); log.info("线程 " + hashCode + " 创建"); return new Thread(r, "threadPool-" + hashCode); } } }
|
使用方式:
1 2 3 4 5 6 7 8
| @Autowired private ThreadPoolExecutor executorService;
public void xxxAsyncHandle() { CompletableFuture.runAsync(() -> { }, executorService); }
|
ThreadPoolTaskExecutor
参考资料: Java ThreadPoolTaskExecutor 配置类代码多种场景示例
从ThreadPoolTaskExecutor的唯一带参构造方法可以看出,实际上在底层依然依赖ThreadPoolExecutor本身,也就是说该工具更关注于扩展的内容,执行任务依然交由ThreadPoolExecutor去处理。
① 将参数配置到配置文件中
application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13
| thread: pool: core-pool-size: 50 max-pool-size: 200 queue-capacity: 1000 keep-alive-seconds: 300 rejected-execution-handler: AbortPolicyWithReport
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| import cn.hutool.log.Log; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor;
@Data @Component @ConfigurationProperties(prefix = "thread.pool") public class ThreadPoolConfig {
private int corePoolSize;
private int maxPoolSize;
private int queueCapacity;
private int keepAliveSeconds;
private String rejectedExecutionHandler;
@Bean("threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(this.corePoolSize); executor.setMaxPoolSize(this.maxPoolSize); executor.setQueueCapacity(this.queueCapacity); executor.setKeepAliveSeconds(this.keepAliveSeconds); executor.setThreadFactory(r -> { Log log = Log.get(); int hashCode = r.hashCode(); log.info("线程 " + hashCode + " 创建。"); return new Thread(r, "threadPool-" + hashCode); }); try { Class<?> clazz = Class.forName("java.util.concurrent.ThreadPoolExecutor$" + this.rejectedExecutionHandler); executor.setRejectedExecutionHandler((RejectedExecutionHandler) clazz.newInstance()); } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) { e.printStackTrace(); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } return executor; } }
|
② 将参数放代码里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Configuration public class ThreadPoolConfig {
private static final int CPU_NUM = Runtime.getRuntime().availableProcessors(); private static final int KEEP_ALIVE_TIME = 10; private static final int QUEUE_CAPACITY = 100; private static final int AWAIT_TERMINATION = 60; private static final String THREAD_NAME_PREFIX = "XXX-THREAD-";
@Bean("threadPoolTaskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(CPU_NUM); taskExecutor.setMaxPoolSize(CPU_NUM * 2 + 1); taskExecutor.setKeepAliveSeconds(KEEP_ALIVE_TIME); taskExecutor.setQueueCapacity(QUEUE_CAPACITY); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAllowCoreThreadTimeOut(true); taskExecutor.setAwaitTerminationSeconds(AWAIT_TERMINATION); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX); taskExecutor.initialize(); return taskExecutor; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; ... Future<Result<XxxDTO>> xxxDTOFuture = threadPoolTaskExecutor.submit(() -> { return 方法调用; }); Result<XxxDTO> xxxDTOResult = xxxDTOFuture.get();
@Async("threadPoolTaskExecutor") public XXX method() { }
|
测试调用: