

近期遇到一个大数据量入库的业务场景,需要先执行若干数据库修改操作,随后进行大批量数据插入。为了提高系统响应性能,采用多线程技术对数据进行拆分并行处理。关键要求是:当任意一个线程执行失败时,所有操作必须全部回滚,保证数据一致性。
java
/**
* 平均拆分List工具方法
* @param source 原始数据列表
* @param n 拆分份数
* @param <T> 数据类型
* @return 拆分后的列表集合
*/public static <T> List<List<T>> averageAssign(List<T> source, int n){
List<List<T>> result = new ArrayList<List<T>>();
int remaider = source.size() % n;
int number = source.size() / n;
int offset = 0; // 偏移量
for(int i = 0; i < n; i++){
List<T> value = null;
if(remaider > 0){
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remaider--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;}
java
/**
* 线程池配置类
*/public class ExecutorConfig {
private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
private volatile static ExecutorService executorService;
public static ExecutorService getThreadPool() {
if (executorService == null){
synchronized (ExecutorConfig.class){
if (executorService == null){
executorService = newThreadPool();
}
}
}
return executorService;
}
private static ExecutorService newThreadPool(){
int queueSize = 500;
int corePool = Math.min(5, maxPoolSize);
return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueSize), new ThreadPoolExecutor.AbortPolicy());
}
private ExecutorConfig(){}}
java
/**
* SQL会话管理组件
*/@Componentpublic class SqlContext {
@Resource
private SqlSessionTemplate sqlSessionTemplate;
public SqlSession getSqlSession(){
SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
return sqlSessionFactory.openSession();
}}
java
/**
* 多线程事务测试方法 - 事务失效版本
* @param employeeDOList 员工数据列表
*/@Override@Transactionalpublic void saveThread(List<EmployeeDO> employeeDOList) {
try {
// 先执行删除操作 - 子线程异常时此操作不会回滚
this.getBaseMapper().delete(null);
// 获取线程池实例
ExecutorService service = ExecutorConfig.getThreadPool();
// 数据拆分(5份)
List<List<EmployeeDO>> lists = averageAssign(employeeDOList, 5);
// 线程数组
Thread[] threadArray = new Thread[lists.size()];
// 使用CountDownLatch监控子线程执行
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
for (int i = 0; i < lists.size(); i++){
if (i == lists.size() - 1){
atomicBoolean.set(false); // 最后一个线程设置异常标志
}
List<EmployeeDO> list = lists.get(i);
threadArray[i] = new Thread(() -> {
try {
// 最后一个线程抛出异常
if (!atomicBoolean.get()){
throw new ServiceException("001", "出现异常");
}
// 批量插入数据
this.saveBatch(list);
} finally {
countDownLatch.countDown();
}
});
}
for (int i = 0; i < lists.size(); i++){
service.execute(threadArray[i]);
}
// 等待所有子线程执行完成
countDownLatch.await();
System.out.println("添加完毕");
} catch (Exception e){
log.info("error", e);
throw new ServiceException("002", "出现异常");
}}
数据库初始状态:存在1条记录

java
@RunWith(SpringRunner.class)@SpringBootTest(classes = {ThreadTest01.class, MainApplication.class})public class ThreadTest01 {
@Resource
private EmployeeBO employeeBO;
/**
* 多线程事务测试方法
* @throws InterruptedException
*/
@Test
public void MoreThreadTest2() throws InterruptedException {
int size = 10;
List<EmployeeDO> employeeDOList = new ArrayList<>(size);
for (int i = 0; i < size; i++){
EmployeeDO employeeDO = new EmployeeDO();
employeeDO.setEmployeeName("lol" + i);
employeeDO.setAge(18);
employeeDO.setGender(1);
employeeDO.setIdNumber(i + "XX");
employeeDO.setCreatTime(Calendar.getInstance().getTime());
employeeDOList.add(employeeDO);
}
try {
employeeBO.saveThread(employeeDOList);
System.out.println("添加成功");
} catch (Exception e){
e.printStackTrace();
}
}}

问题确认: 子线程组执行过程中,当某个线程执行失败时,其他线程也会抛出异常。然而主线程中执行的删除操作并未回滚,证明@Transactional注解在多线程环境下失效。
java
@Resourceprivate SqlContext sqlContext;
/**
* 多线程事务测试方法 - 手动控制事务版本
* @param employeeDOList 员工数据列表
*/@Overridepublic void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
// 获取数据库连接和会话(包含独立事务)
SqlSession sqlSession = sqlContext.getSqlSession();
Connection connection = sqlSession.getConnection();
try {
// 设置为手动提交模式
connection.setAutoCommit(false);
// 获取Mapper实例
EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
// 先执行删除操作
employeeMapper.delete(null);
// 获取线程池执行器
ExecutorService service = ExecutorConfig.getThreadPool();
List<Callable<Integer>> callableList = new ArrayList<>();
// 数据拆分
List<List<EmployeeDO>> lists = averageAssign(employeeDOList, 5);
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
for (int i = 0; i < lists.size(); i++){
if (i == lists.size() - 1){
atomicBoolean.set(false); // 最后一个线程设置异常标志
}
List<EmployeeDO> list = lists.get(i);
// 使用Callable支持返回结果
Callable<Integer> callable = () -> {
// 最后一个线程抛出异常
if (!atomicBoolean.get()){
throw new ServiceException("001", "出现异常");
}
return employeeMapper.saveBatch(list);
};
callableList.add(callable);
}
// 执行所有子线程任务
List<Future<Integer>> futures = service.invokeAll(callableList);
// 检查所有任务执行结果
for (Future<Integer> future : futures) {
// 任一任务执行失败则整体回滚
if (future.get() <= 0){
connection.rollback();
return;
}
}
// 所有任务成功则提交事务
connection.commit();
System.out.println("添加完毕");
} catch (Exception e){
// 异常情况回滚事务
connection.rollback();
log.info("error", e);
throw new ServiceException("002", "出现异常");
} finally {
connection.close();
}}
xml
<insert id="saveBatch" parameterType="List">
INSERT INTO employee
(employee_id, age, employee_name, birth_date, gender, id_number, creat_time, update_time, status)
values
<foreach collection="list" item="item" index="index" separator=",">
(
#{item.employeeId},
#{item.age},
#{item.employeeName},
#{item.birthDate},
#{item.gender},
#{item.idNumber},
#{item.creatTime},
#{item.updateTime},
#{item.status}
)
</foreach></insert>
数据库初始状态:存在1条记录

测试结果: 成功抛出异常

事务验证: 删除操作已回滚,数据库中原始数据仍然存在,证明事务控制成功


java
@Resourceprivate SqlContext sqlContext;
/**
* 多线程事务测试方法 - 正常执行版本
* @param employeeDOList 员工数据列表
*/@Overridepublic void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
// 获取数据库连接和会话
SqlSession sqlSession = sqlContext.getSqlSession();
Connection connection = sqlSession.getConnection();
try {
// 设置为手动提交模式
connection.setAutoCommit(false)
// 获取Mapper实例
EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class)
// 先执行删除操作
employeeMapper.delete(null);
// 获取线程池执行器
ExecutorService service = ExecutorConfig.getThreadPool();
List<Callable<Integer>> callableList = new ArrayList<>()
// 数据拆分
List<List<EmployeeDO>> lists = averageAssign(employeeDOList, 5)
for (int i = 0; i < lists.size(); i++){
List<EmployeeDO> list = lists.get(i);
Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
callableList.add(callable);
}
// 执行所有子线程任务
List<Future<Integer>> futures = service.invokeAll(callableList)
// 检查执行结果
for (Future<Integer> future : futures) {
if (future.get() <= 0){
connection.rollback();
return;
}
}
// 提交事务
connection.commit();
System.out.println("添加完毕");
} catch (Exception e){
// 异常回滚
connection.rollback();
log.info("error", e);
throw new ServiceException("002", "出现异常");
}}

数据库状态: 原始数据已删除,新数据成功插入,验证测试完全成功。


一家致力于优质服务的软件公司
8年互联网行业经验1000+合作客户2000+上线项目60+服务地区

关注微信公众号
