⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 azdebug.blog.csdn.net/article/details/103697108 「阿啄debugIT」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

前言

  • 开发目的: 提高百万级数据插入效率。
  • 采取方案: 利用ThreadPoolTaskExecutor多线程批量插入。
  • 采用技术: springboot2.1.1+mybatisPlus3.0.6+swagger2.5.0+Lombok1.18.4+postgresql+ThreadPoolTaskExecutor等。

具体实现细节

application-dev.properties添加线程池配置信息

# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 30
# 配置最大线程数
async.executor.thread.max_pool_size = 30
# 配置队列大小
async.executor.thread.queue_capacity = 99988
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-importDB-

spring容器注入线程池bean对象

@Configuration

@EnableAsync

@Slf4j

public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;

@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.warn("start asyncServiceExecutor");
//在这里修改
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}

创建异步线程 业务类

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("asyncServiceExecutor")
public void executeAsync(List<LogOutputResult> logOutputResults, LogOutputResultMapper logOutputResultMapper, CountDownLatch countDownLatch) {
try{
log.warn("start executeAsync");
//异步线程要做的事情
logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
log.warn("end executeAsync");
}finally {
countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
}
}
}

创建多线程批量插入具体业务方法

@Override
public int testMultiThread() {
List<LogOutputResult> logOutputResults = getTestData();
//测试每100条数据插入开一个线程
List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
for (List<LogOutputResult> listSub:lists) {
asyncService.executeAsync(listSub, logOutputResultMapper,countDownLatch);
}
try {
countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
// 这样就可以在下面拿到所有线程执行完的集合结果
} catch (Exception e) {
log.error("阻塞异常:"+e.getMessage());
}
return logOutputResults.size();
}

模拟2000003 条数据进行测试

多线程 测试 2000003 耗时如下:耗时1.67分钟

本次开启30个线程,截图如下:

单线程测试2000003 耗时如下:耗时5.75分钟

检查多线程入库的数据,检查是否存在重复入库的问题:

根据id分组,查看是否有id重复的数据,通过sql语句检查,没有发现重复入库的问题

检查数据完整性:通过sql语句查询,多线程录入数据完整

测试结果

不同线程数测试:

总结

通过以上测试案列,同样是导入2000003 条数据,多线程耗时1.67分钟,单线程耗时5.75分钟。通过对不同线程数的测试,发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:

CPU核心数量*2 +2 个线程。

附:测试电脑配置

文章目录
  1. 1. 前言
  2. 2. 具体实现细节
    1. 2.0.1. application-dev.properties添加线程池配置信息
    2. 2.0.2. spring容器注入线程池bean对象
    3. 2.0.3. 创建异步线程 业务类
    4. 2.0.4. 创建多线程批量插入具体业务方法
    5. 2.0.5. 模拟2000003 条数据进行测试
    6. 2.0.6. 检查多线程入库的数据,检查是否存在重复入库的问题:
  • 3. 测试结果
  • 4. 总结