@Transactional 失效解决方案与实战案例

Spring 多线程事务避坑指南:@Transaction

一、业务场景与问题分析


1.1 业务需求背景

近期遇到一个大数据量入库的业务场景,需要先执行若干数据库修改操作,随后进行大批量数据插入。为了提高系统响应性能,采用多线程技术对数据进行拆分并行处理。关键要求是:当任意一个线程执行失败时,所有操作必须全部回滚,保证数据一致性。

1.2 技术挑战

在Spring框架中,虽然可以通过@Transactional注解实现声明式事务管理,但在多线程环境下该机制会失效。具体表现为:主线程执行的数据库操作在子线程发生异常时无法正常回滚,从而导致数据不一致问题。

1.3 示例说明

以下通过具体示例演示多线程环境下的事务控制问题及解决方案。



二、公共工具类与方法


2.1 数据拆分工具方法

java

/**

* 平均拆分List工具方法

* @param source 原始数据列表

* @param n 拆分份数

* @param <T> 数据类型

* @return 拆分后的列表集合

*/public static <T> List<List<T>> averageAssign(List<T> source, int n){

   List<List<T>> result = new ArrayList<List<T>>();

   int remaider = source.size() % n;

   int number = source.size() / n;

   int offset = 0; // 偏移量

   for(int i = 0; i < n; i++){

       List<T> value = null;

       if(remaider > 0){

           value = source.subList(i * number + offset, (i + 1) * number + offset + 1);

           remaider--;

           offset++;

       } else {

           value = source.subList(i * number + offset, (i + 1) * number + offset);

       }

       result.add(value);

   }

   return result;}

2.2 线程池配置类

java

/**

* 线程池配置类

*/public class ExecutorConfig {

   private static int maxPoolSize = Runtime.getRuntime().availableProcessors();

   private volatile static ExecutorService executorService;

    public static ExecutorService getThreadPool() {

       if (executorService == null){

           synchronized (ExecutorConfig.class){

               if (executorService == null){

                   executorService = newThreadPool();

               }

           }

       }

       return executorService;

   }

   private static ExecutorService newThreadPool(){

       int queueSize = 500;

       int corePool = Math.min(5, maxPoolSize);

       return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,

               new LinkedBlockingQueue<>(queueSize), new ThreadPoolExecutor.AbortPolicy());

   }

   private ExecutorConfig(){}}

2.3 SQL会话管理类

java

/**

* SQL会话管理组件

*/@Componentpublic class SqlContext {

   @Resource

   private SqlSessionTemplate sqlSessionTemplate;

   public SqlSession getSqlSession(){

       SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();

       return sqlSessionFactory.openSession();

   }}



三、事务失效问题示例


3.1 问题代码实现

java

/**

* 多线程事务测试方法 - 事务失效版本

* @param employeeDOList 员工数据列表

*/@Override@Transactionalpublic void saveThread(List<EmployeeDO> employeeDOList) {

   try {

       // 先执行删除操作 - 子线程异常时此操作不会回滚

       this.getBaseMapper().delete(null);

       // 获取线程池实例

       ExecutorService service = ExecutorConfig.getThreadPool();

       // 数据拆分(5份)

       List<List<EmployeeDO>> lists = averageAssign(employeeDOList, 5);

       // 线程数组

       Thread[] threadArray = new Thread[lists.size()];

       // 使用CountDownLatch监控子线程执行

       CountDownLatch countDownLatch = new CountDownLatch(lists.size());

       AtomicBoolean atomicBoolean = new AtomicBoolean(true);

       for (int i = 0; i < lists.size(); i++){

           if (i == lists.size() - 1){

               atomicBoolean.set(false); // 最后一个线程设置异常标志

           }

           List<EmployeeDO> list = lists.get(i);

           threadArray[i] = new Thread(() -> {

               try {

                   // 最后一个线程抛出异常

                   if (!atomicBoolean.get()){

                       throw new ServiceException("001", "出现异常");

                   }

                   // 批量插入数据

                   this.saveBatch(list);

               } finally {

                   countDownLatch.countDown();

               }

           });

       }

       for (int i = 0; i < lists.size(); i++){

           service.execute(threadArray[i]);

       }

       // 等待所有子线程执行完成

       countDownLatch.await();

       System.out.println("添加完毕");

   } catch (Exception e){

       log.info("error", e);

       throw new ServiceException("002", "出现异常");

   }}

3.2 测试环境准备

      数据库初始状态:存在1条记录

     

3.3 测试用例

java

@RunWith(SpringRunner.class)@SpringBootTest(classes = {ThreadTest01.class, MainApplication.class})public class ThreadTest01 {

   @Resource

   private EmployeeBO employeeBO;

   /**

    * 多线程事务测试方法

    * @throws InterruptedException

    */

   @Test

   public void MoreThreadTest2() throws InterruptedException {

       int size = 10;

       List<EmployeeDO> employeeDOList = new ArrayList<>(size);

       for (int i = 0; i < size; i++){

           EmployeeDO employeeDO = new EmployeeDO();

           employeeDO.setEmployeeName("lol" + i);

           employeeDO.setAge(18);

           employeeDO.setGender(1);

           employeeDO.setIdNumber(i + "XX");

           employeeDO.setCreatTime(Calendar.getInstance().getTime());

           employeeDOList.add(employeeDO);

       }

       try {

           employeeBO.saveThread(employeeDOList);

           System.out.println("添加成功");

       } catch (Exception e){

           e.printStackTrace();

       }

   }}

3.4 测试结果分析

       

问题确认: 子线程组执行过程中,当某个线程执行失败时,其他线程也会抛出异常。然而主线程中执行的删除操作并未回滚,证明@Transactional注解在多线程环境下失效。



四、手动事务控制解决方案


4.1 解决方案代码

java

@Resourceprivate SqlContext sqlContext;

/**

* 多线程事务测试方法 - 手动控制事务版本

* @param employeeDOList 员工数据列表

*/@Overridepublic void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {

   // 获取数据库连接和会话(包含独立事务)

   SqlSession sqlSession = sqlContext.getSqlSession();

   Connection connection = sqlSession.getConnection();

   try {

       // 设置为手动提交模式

       connection.setAutoCommit(false);

       // 获取Mapper实例

       EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);

       // 先执行删除操作

       employeeMapper.delete(null);

       // 获取线程池执行器

       ExecutorService service = ExecutorConfig.getThreadPool();

       List<Callable<Integer>> callableList = new ArrayList<>();

       // 数据拆分

       List<List<EmployeeDO>> lists = averageAssign(employeeDOList, 5);

       AtomicBoolean atomicBoolean = new AtomicBoolean(true);

       for (int i = 0; i < lists.size(); i++){

           if (i == lists.size() - 1){

               atomicBoolean.set(false); // 最后一个线程设置异常标志

           }

           List<EmployeeDO> list = lists.get(i);

           // 使用Callable支持返回结果

           Callable<Integer> callable = () -> {

               // 最后一个线程抛出异常

               if (!atomicBoolean.get()){

                   throw new ServiceException("001", "出现异常");

               }

               return employeeMapper.saveBatch(list);

           };

           callableList.add(callable);

       }

       // 执行所有子线程任务

       List<Future<Integer>> futures = service.invokeAll(callableList);

       // 检查所有任务执行结果

       for (Future<Integer> future : futures) {

           // 任一任务执行失败则整体回滚

           if (future.get() <= 0){

               connection.rollback();

               return;

           }

       }

       // 所有任务成功则提交事务

       connection.commit();

       System.out.println("添加完毕");

   } catch (Exception e){

       // 异常情况回滚事务

       connection.rollback();

       log.info("error", e);

       throw new ServiceException("002", "出现异常");

   } finally {

       connection.close();

   }}

4.2 MyBatis批量插入SQL

xml

<insert id="saveBatch" parameterType="List">

   INSERT INTO employee

   (employee_id, age, employee_name, birth_date, gender, id_number, creat_time, update_time, status)

   values

   <foreach collection="list" item="item" index="index" separator=",">

       (

       #{item.employeeId},

       #{item.age},

       #{item.employeeName},

       #{item.birthDate},

       #{item.gender},

       #{item.idNumber},

       #{item.creatTime},

       #{item.updateTime},

       #{item.status}

       )

   </foreach></insert>

4.3 异常情况测试

数据库初始状态:存在1条记录

测试结果: 成功抛出异常

事务验证: 删除操作已回滚,数据库中原始数据仍然存在,证明事务控制成功

五、正常执行场景验证


5.1 正常执行代码

java

@Resourceprivate SqlContext sqlContext;

/**

* 多线程事务测试方法 - 正常执行版本

* @param employeeDOList 员工数据列表

*/@Overridepublic void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {

   // 获取数据库连接和会话

   SqlSession sqlSession = sqlContext.getSqlSession();

   Connection connection = sqlSession.getConnection();

   try {

       // 设置为手动提交模式

       connection.setAutoCommit(false)

      // 获取Mapper实例

       EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class)

       // 先执行删除操作

       employeeMapper.delete(null);

       // 获取线程池执行器

       ExecutorService service = ExecutorConfig.getThreadPool();

       List<Callable<Integer>> callableList = new ArrayList<>()

       // 数据拆分

       List<List<EmployeeDO>> lists = averageAssign(employeeDOList, 5)

      for (int i = 0; i < lists.size(); i++){

           List<EmployeeDO> list = lists.get(i);

           Callable<Integer> callable = () -> employeeMapper.saveBatch(list);

           callableList.add(callable);

       }

      // 执行所有子线程任务

       List<Future<Integer>> futures = service.invokeAll(callableList)

       // 检查执行结果

       for (Future<Integer> future : futures) {

           if (future.get() <= 0){

               connection.rollback();

               return;

           }

       }

      // 提交事务

       connection.commit();

       System.out.println("添加完毕");

   } catch (Exception e){

       // 异常回滚

       connection.rollback();

       log.info("error", e);

       throw new ServiceException("002", "出现异常");

   }}

5.2 正常执行结果

5.3 数据验证

数据库状态: 原始数据已删除,新数据成功插入,验证测试完全成功。

软件开发 就找木风!

一家致力于优质服务的软件公司

8年互联网行业经验1000+合作客户2000+上线项目60+服务地区

关注微信公众号

在线客服

在线客服

微信咨询

微信咨询

电话咨询

电话咨询