application-dev.properties
添加线程池配置信息
1 2 3 4 5 6 7 8 9
|
async.executor.thread.core_pool_size = 30
async.executor.thread.max_pool_size = 30
async.executor.thread.queue_capacity = 99988
async.executor.thread.name.prefix = async-importDB-
|
spring容器注入线程池bean对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Configuration @EnableAsync @Slf4j public class ExecutorConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.warn("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix(namePrefix); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
|
创建异步线程 业务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Service @Slf4j public class AsyncServiceImpl implements AsyncService { @Override @Async("asyncServiceExecutor") public void executeAsync(List<LogOutputResult> logOutputResults, LogOutputResultMapper logOutputResultMapper, CountDownLatch countDownLatch) { try{ log.warn("start executeAsync"); logOutputResultMapper.addLogOutputResultBatch(logOutputResults); log.warn("end executeAsync"); }finally { countDownLatch.countDown(); } } }
|
创建多线程批量插入具体业务方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Override public int testMultiThread() { List<LogOutputResult> logOutputResults = getTestData(); List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100); CountDownLatch countDownLatch = new CountDownLatch(lists.size()); for (List<LogOutputResult> listSub:lists) { asyncService.executeAsync(listSub, logOutputResultMapper,countDownLatch); } try { countDownLatch.await(); } catch (Exception e) { log.error("阻塞异常:"+e.getMessage()); } return logOutputResults.size(); }
|
模拟多线程 测试 2000003条数据
耗时如下:耗时1.67分钟。