V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
wayn111
V2EX  ›  推广

多数据源事务处理-涉及分布式事务

  •  
  •   wayn111 · 2022-12-16 13:57:32 +08:00 · 738 次点击
    这是一个创建于 750 天前的主题,其中的信息可能已经有所发展或是发生改变。

    开启掘金成长之旅!这是我参与「掘金日新计划 · 12 月更文挑战」的第 5 天,点击查看活动详情

    在作者之前的 十二条后端开发经验分享,纯干货 文章中介绍的 优雅得 Springboot + mybatis 配置多数据源方式 里有很多小伙伴在评论区留言询问多个数据源同时在一个方法中使用时,事务是否会正常有效,这里作者 理论 + 实践 给大家解答一波,老规矩,附作者 github 地址:

    一. 数据源跨库但是不跨 MySql 实例

    这个形式就是数据源在同一个 MySQL 下,但是 jdbc-url 上的数据库配置不同,涉及多个数据库时,如果方法中发生异常,只有开启事务的数据源会发生回滚,其他数据源不会回滚。看到这里可能有点迷惑,什么是 只有开启事务的数据源会发生回滚,其他数据源不会回滚?

    下面给出代码验证:

    主数据源配置

    @Slf4j
    @EnableTransactionManagement
    @EnableAspectJAutoProxy
    @Configuration
    @MapperScan(basePackages = "ltd.newbee.mall.core.dao", sqlSessionFactoryRef = "masterSqlSessionFactory")
    public class Db1DataSourceConfig {
    
        @Primary
        @Bean
        @ConfigurationProperties("spring.datasource.druid.master")
        public DataSource masterDataSource(DruidProperties druidProperties) {
            DruidDataSource build = DruidDataSourceBuilder.create().build();
            return druidProperties.dataSource(build);
        }
    
        /**
         * @param datasource 数据源
         * @return SqlSessionFactory
         * @Primary 默认 SqlSessionFactory
         */
        @Primary
        @Bean(name = "masterSqlSessionFactory")
        public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource datasource,
                                                         Interceptor interceptor) throws Exception {
            MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
            bean.setDataSource(datasource);
            // mybatis 扫描 xml 所在位置
            bean.setMapperLocations(new PathMatchingResourcePatternResolver()
                    .getResources("classpath*:mapper/*.xml"));
            bean.setTypeAliasesPackage("ltd.**.core.entity");
            bean.setPlugins(interceptor);
            GlobalConfig globalConfig = new GlobalConfig();
            GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
            dbConfig.setLogicDeleteField("isDeleted");
            dbConfig.setLogicDeleteValue("1");
            dbConfig.setLogicNotDeleteValue("0");
            globalConfig.setDbConfig(dbConfig);
            bean.setGlobalConfig(globalConfig);
            log.info("masterDataSource 配置成功");
            return bean.getObject();
        }
    
        @Primary
        @Bean(name = "masterTransactionManager")
        public DataSourceTransactionManager masterTransactionManager(@Qualifier("masterDataSource") DataSource dataSource) {
            return new DataSourceTransactionManager(dataSource);
        }
    
    }
    

    从数据源配置

    @Slf4j
    @ConditionalOnProperty(value = "transactional.mode", havingValue = "seata")
    @EnableTransactionManagement
    @EnableAspectJAutoProxy
    @Configuration
    @MapperScan(basePackages = "ltd.newbee.mall.slave.dao", sqlSessionFactoryRef = "slaveSqlSessionFactory")
    public class Db2DataSourceConfig {
    
        @Bean
        @ConfigurationProperties("spring.datasource.druid.slave")
        public DataSource slaveDataSource(DruidProperties druidProperties) {
            DruidDataSource build = DruidDataSourceBuilder.create().build();
            return druidProperties.dataSource(build);
        }
    
    
        /**
         * @param datasource 数据源
         * @return SqlSessionFactory
         * @Primary 默认 SqlSessionFactory
         */
        @Bean(name = "slaveSqlSessionFactory")
        public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource datasource,
                                                        Interceptor interceptor) throws Exception {
            MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
            bean.setDataSource(datasource);
            // mybatis 扫描 xml 所在位置
            bean.setMapperLocations(new PathMatchingResourcePatternResolver()
                    .getResources("classpath*:slavemapper/*.xml"));
            bean.setTypeAliasesPackage("ltd.**.slave.entity");
            bean.setPlugins(interceptor);
            GlobalConfig globalConfig = new GlobalConfig();
            GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
            dbConfig.setLogicDeleteField("isDeleted");
            dbConfig.setLogicDeleteValue("1");
            dbConfig.setLogicNotDeleteValue("0");
            globalConfig.setDbConfig(dbConfig);
            bean.setGlobalConfig(globalConfig);
            log.info("slaveDataSource 配置成功");
            return bean.getObject();
        }
        
        @Bean(name = "slaveTransactionManager")
        public DataSourceTransactionManager slaveTransactionManager(@Qualifier("slaveDataSource") DataSource dataSource) {
            return new DataSourceTransactionManager(dataSource);
        }
    
    }
    

    划重点-上述代码在每个数据源中都配置了 DataSourceTransactionManager(事务管理器),并且在主配置中添加 @Primary 注解,表示默认事务管理器优先使用主数据源的事务管理器。 下面给出测试代码:

    /**
     *  Springboot 测试类
     */
    @Slf4j
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class MultiDataSourceTest {
        @Autowired
        private MultiDataService multiDataService;
        @Test
        public void testRollback() {
            multiDataService.testRollback();
        }
    }
    /**
     *  MultiDataService 实现类
     */
    @Slf4j
    @Service
    public class MultiDataServiceImpl implements MultiDataService {
        @Autowired
        private TbTable1Service tbTable1Service;
        @Autowired
        private TbTable2Service tbTable2Service;
        @Autowired
        private PlatformTransactionManager transactionManager;
        @Override
        public void testRollback() {
            DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
            TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
            try {
                TbTable1 tbTable1 = new TbTable1();
                tbTable1.setName("test1");
                // 插入 table1 表
                boolean save1 = tbTable1Service.save(tbTable1);
                TbTable2 tbTable2 = new TbTable2();
                tbTable2.setName("test2");
                // 插入 table2 表
                boolean save2 = tbTable2Service.save(tbTable2);
                int i = 1 / 0;
                transactionManager.commit(transaction);
                Assert.isTrue(save1 && save2);
            } catch (Exception e) {
                log.info(e.getMessage(), e);
                transactionManager.rollback(transaction);
            }
        }
    }
    

    执行结果:table1 表回滚成功,table2 表回滚失败。由此结果,对于 只有开启事务的数据源会发生回滚,其他数据源不会回滚? 我们的解释就是 Spring 中默认使用的事务管理器是使用主数据源配置还是从数据源配置由我们通过 @Primary 决定,当我们把 @Primary 切换在从数据源配置上,执行结果:table2 表回滚成功,table1 表回滚失败。那怎么解决这个问题?

    当涉及到跨库或者跨 MySQL 实例,想要保证事务操作,我们这里先给出XA事务解决方案。附 XA 事务的说明:

    XA 是由 X/Open 组织提出的分布式事务规范,XA 规范主要定义了事务协调者( Transaction Manager )和资源管理器( Resource Manager )之间的接口。

    事务协调者( Transaction Manager ),因为 XA 事务是基于两阶段提交协议的,所以需要有一个协调者,来保证所有的事务参与者都完成了准备工作,也就是 2PC 的第一阶段。如果事务协调者收到所有参与者都准备好的消息,就会通知所有的事务都可以提交,也就是 2PC 的第二阶段。

    资源管理器( Resource Manager ),负责控制和管理实际资源,比如数据库。

    (划重点) XA 的 MySQL 实现使 MySQL 服务器能够充当资源管理器,在全局事务中处理 XA 事务。连接到 MySQL 服务器的客户端程序充当事务协调者

    XA 事务的执行流程

    XA 事务是两阶段提交的一种实现方式,根据 2PC 的规范,XA 将一次事务分割成了两个阶段,即 Prepare 和 Commit 阶段。

    Prepare 阶段,TM 向所有 RM 发送 prepare 指令,RM 接受到指令后,执行数据修改和日志记录等操作,然后返回可以提交或者不提交的消息给 TM 。如果事务协调者 TM 收到所有参与者都准备好的消息,会通知所有的事务提交,然后进入第二阶段。

    Commit 阶段,TM 接受到所有 RM 的 prepare 结果,如果有 RM 返回是不可提交或者超时,那么向所有 RM 发送 Rollback 命令;如果所有 RM 都返回可以提交,那么向所有 RM 发送 Commit 命令,完成一次事务操作。

    下面给出两种基于 XA 事务的解决方案:

    • Springboot 项目中可以使用 jta,完成对 XA 协议的支持,缺点就是 jta 需要改造数据源配置
    • Springboot 项目引入 seataseata 支持 XA 协议,且引入 seata-spring-boot-starter 依赖对业务无侵入,缺点需要引入 seata-server 降低了系统可用性

    Springboot 项目中可以启用 jta

    1. 引入 spring-boot-starter-jta-atomikos
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>
    
    1. 修改主从数据源 DataSource 配置,进行包装添加 XA 数据源支持,如下;
    
        @Primary
        @Bean
        @ConfigurationProperties("spring.datasource.druid.master")
        public DataSource dataSource(DruidProperties druidProperties) {
            DruidXADataSource dataSource = druidProperties.dataSource(new DruidXADataSource());
            dataSource.setUrl("jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8");
            dataSource.setUsername("root");
            dataSource.setPassword("");
            dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
            AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
            atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
            atomikosDataSourceBean.setUniqueResourceName("master-xa");
            atomikosDataSourceBean.setXaDataSource(dataSource);
            return atomikosDataSourceBean;
        }
    
    1. 添加 JtaTransactionManager
    @Bean
    public JtaTransactionManager transactionManager() throws Exception {
        JtaTransactionManager transactionManager = new JtaTransactionManager();
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(true);
        userTransactionManager.setTransactionTimeout(3000);
        transactionManager.setUserTransaction(userTransactionManager);
        transactionManager.setAllowCustomIsolationLevels(true);
        return transactionManager;
    }
    
    1. 完成测试,代码如下:
    /**
     *  Springboot 测试类
     */
    @Slf4j
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class MultiDataSourceTest {
        @Autowired
        private MultiDataService multiDataService;
        @Test
        public void jtaTestRollback() {
            multiDataService.jtaTestRollback();
        }
    }
    /**
     *  MultiDataService 实现类
     */
    @Slf4j
    @Service
    public class MultiDataServiceImpl implements MultiDataService {
        @Autowired
        private TbTable1Service tbTable1Service;
        @Autowired
        private TbTable2Service tbTable2Service;
        @Autowired
        private JtaTransactionManager jtaTransactionManager;
        @Override
        public void jtaTestRollback() {
            DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
            TransactionStatus transaction = jtaTransactionManager.getTransaction(transactionDefinition);
            try {
                TbTable1 tbTable1 = new TbTable1();
                tbTable1.setName("test1");
                boolean save1 = tbTable1Service.save(tbTable1);
                TbTable2 tbTable2 = new TbTable2();
                tbTable2.setName("test2");
                boolean save2 = tbTable2Service.save(tbTable2);
                int i = 1 / 0;
                jtaTransactionManager.commit(transaction);
                Assert.isTrue(save1 && save2);
            } catch (Exception e) {
                log.info(e.getMessage(), e);
                jtaTransactionManager.rollback(transaction);
            }
        }
    }
    

    可以看到我们使用的是 JtaTransactionManager, 执行结果:table1 表回滚成功,table2 表回滚成功。验证 OK

    引入 seata,添加 XA 协议支持

    1. 下载安装启动 seata-server,这里给出官网教程: https://seata.io/zh-cn/docs/ops/deploy-guide-beginner.html
    2. 在 Springboot 中引入 seata 最新依赖
    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
        <version>1.5.2</version>
    </dependency>
    
    1. 在 yml 文件中添加 seata 配置
    seata:
      config:
        type: file
      registry:
        type: file
      application-id: newbeemall # Seata 应用编号,默认为 ${spring.application.name}
      tx-service-group: newbeemall-group # Seata 事务组编号,用于 TC 集群名
      # 服务配置项,对应 ServiceProperties 类
      service:
        # 虚拟组和分组的映射
        vgroup-mapping:
          newbeemall-group: default
        # 分组和 Seata 服务的映射
        grouplist:
          default: 127.0.0.1:8091
      data-source-proxy-mode: XA
      enabled: true
    
    1. 完成测试,代码如下:
    /**
     *  Springboot 测试类
     */
    @Slf4j
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class MultiDataSourceTest {
        @Autowired
        private MultiDataService multiDataService;
        @Test
        public void seataTestRollback() {
            multiDataService.seataTestRollback();
        }
    }
    /**
     *  MultiDataService 实现类
     */
    @Slf4j
    @Service
    public class MultiDataServiceImpl implements MultiDataService {
        @Autowired
        private TbTable1Service tbTable1Service;
        @Autowired
        private TbTable2Service tbTable2Service;
        @GlobalTransactional
        @Override
        public void seataTestRollback() {
            log.info("当前 XID: {}", RootContext.getXID());
            TbTable1 tbTable1 = new TbTable1();
            tbTable1.setName("test1");
            boolean save1 = tbTable1Service.save(tbTable1);
            TbTable2 tbTable2 = new TbTable2();
            tbTable2.setName("test2");
            boolean save2 = tbTable2Service.save(tbTable2);
            int i = 1 / 0;
        }
    }
    

    如上代码,使用 seata 时需要启用 @GlobalTransactional 注解,并且在事务中传递 XIDRootContext.getXID()),执行结果:table1 表回滚成功,table2 表回滚成功。验证 OK

    二. 数据源分布在不同 MySql 实例

    当数据源分布在不同 MySql 实例时,这时候其实已经进入分布式事务的范畴,由上可知,XA 事务可以解决分布式环境下事务问题,也就是说上述最后两种解决方案都可以解决分布式事务问题,但是实际使用过程中,我们建议使用 seata,理由是他不仅支持 XA 事务还支持 AT 、Saga 、TCC事务模型。引入 seata 官网介绍

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT 、TCC 、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

    总结

    关于多数据源事务的问题,不管跨不跨库其实都属于分布式事务的问题。推荐使用 seata 解决。

    实践代码放在 newbeemall 项目: https://github.com/wayn111/newbee-mall/tree/springboot2.7 分支下 image.png

    欢迎大家点赞、关注、评论,想要跟作者沟通技术问题的话可以加我微信 [ waynaqua ] ,欢迎大家前来交流。

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1060 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 23:32 · PVG 07:32 · LAX 15:32 · JFK 18:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.