⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 huaweicloud.csdn.net/63876ff5dacf622b8df8c462.html 「.李太白.」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

一、使用场景

数据库有两张表 t_persont_school 如下:前端传来10000条person数据要插入到t_person,同时要删除t_school表中id为1的数据(为提高效率采用线程池做)

二、思路

1、要保证主线程和子线程使用的同一个sqlSession

2、手动控制提交和回滚

3、将10000条数据均分成10份,每份1000条,创建10个任务,放入线程池执行!

三、代码及注释如下:

1、核心业务代码

@Service
public class PersonServiceImpl extends ServiceImpl<PersonMapper, Person> implements IPersonService {

@Autowired
private SqlSessionTemplate sqlSessionTemplate;

@Autowired
private SchoolMapper schoolMapper;

private ArrayBlockingQueue queue=new ArrayBlockingQueue(8,true);

private ThreadPoolExecutor.CallerRunsPolicy policy=new ThreadPoolExecutor.CallerRunsPolicy();

//1、创建核心线程为10的线程池
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10,15,10, TimeUnit.SECONDS
,queue,policy);


@Override
public int insertPerson(Person person) {
return this.baseMapper.insert(person);
}

@Override
@Transactional
public void inserPersonBatch(List<Person> list) throws SQLException {

//2、根据sqlSessionTemplate获取SqlSession工厂
SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
SqlSession sqlSession = sqlSessionFactory.openSession();
//3、获取Connection来手动控制事务
Connection connection = sqlSession.getConnection();
try{
//4、设置手动提交
connection.setAutoCommit(false);
//5、获取PersonMapper(此处是由于无法通过this.baseMapper调用自定义的saveBatch方法)
PersonMapper mapper = sqlSession.getMapper(PersonMapper.class);
//6、主线程去删除t_school表中id为1的数据
schoolMapper.deleteById("1");
//7、将传入List中的10000个数据按1000一组均分成10组
List<List<Person>> lists = ListUtils.averageAssign(list,1000);
//8、新建任务列表
List<Callable<Integer>> callableList = new ArrayList<>();
//9、根据均分的5组数据分别新建5个Callable任务
for(int i = 0; i < lists.size(); i++){
List<Person> insertList = lists.get(i);
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int n = 0;
try{
n = mapper.saveBatch(insertList);
}catch (Exception e){
//插入失败返回0
return n;
}
//插入成功返回成功提交数
return n;
}
};
callableList.add(callable);
}

//10、任务放入线程池开始执行
List<Future<Integer>> futures = executor.invokeAll(callableList);
//11、对比每个任务的返回值 <= 0 代表执行失败
for(Future<Integer> future : futures){
if(future.get() <= 0){
//12、只要有一组任务失败回滚整个connection
connection.rollback();
return;
}
}
//13、主线程和子线程都执行成功 直接提交
connection.commit();
System.out.println("添加成功!");

}catch (Exception e){
//14、主线程报错回滚
connection.rollback();
log.error(e.toString());
throw new SQLException("出现异常!");
}
return;
}
}

2、PersonMapper中自定义批量插入

<insert id="saveBatch" parameterType="list">
insert into t_person(id,name,age,addr,classes,school_id)
values
<foreach collection="list" index="index" item="item" separator=",">
(
#{item.id},
#{item.name},
#{item.age},
#{item.addr},
#{item.classes},
#{item.schoolId}
)
</foreach>
</insert>

3、均分List工具类

public class ListUtils {

public static <T> List<List<T>> averageAssign(List<T> source, int limit) {
if (null == source || source.isEmpty()) {
return Collections.emptyList();
}
List<List<T>> result = new ArrayList<>();
int listCount = (source.size() - 1) / limit + 1;
int remaider = source.size() % listCount; // (先计算出余数)
int number = source.size() / listCount; // 然后是商
int offset = 0;// 偏移量
for (int i = 0; i < listCount; i++) {
List<T> value;
if (remaider > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remaider--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;
}
}

四、测试验证:

controller层如下:传入10000条数据

@GetMapping("/addBatch")
public void addBatch() {
List<Person> list = new ArrayList<>();
for(int i = 1; i <= 10000; i++){
Person p = new Person();
p.setId(i);
p.setName("张三" + i);
p.setAge(i);
p.setAddr("重庆");
p.setClasses("一班");
p.setSchoolId(i);
list.add(p);
}
try{
this.iPersonService.inserPersonBatch(list);
}catch (Exception e){
e.printStackTrace();
}
}

1、情况1:子线程中有一个执行失败

t_person表主键唯一 10000条Person数据id按1—10000设置

如图t_person表中已经有一条id为201的数据 所以线程池中有一个任务执行会失败!

我们打断点来看:此时已经分配好10个任务

如下图:插入id为201的数据时失败,线程池第一个任务执行失败返回0,其余全部成功返回1000

执行rollback回滚

执行完毕观察数据库:

t_school表数据没有被删,

t_person表数据也没有变化

2、情况2、删除 t_person表中id为201的数据重新插入

此时10个任务全部执行成功:

执行commit

执行完毕观察数据库:

t_school表数据已被删除

t_person表中10000条数据也成功插入:

3、情况3:主线程报错就不演示了

以上测试成功!

文章目录
  1. 1. 一、使用场景
  2. 2. 二、思路
  3. 3. 三、代码及注释如下:
  4. 4. 四、测试验证:
    1. 4.0.1. 1、情况1:子线程中有一个执行失败
    2. 4.0.2. 2、情况2、删除 t_person表中id为201的数据重新插入
    3. 4.0.3. 3、情况3:主线程报错就不演示了