Categories
程式開發

分佈式事務太繁瑣?官方推薦Atomikos,5分鐘幫你搞定


互聯網應用架構:專注編程教學,架構,JAVA,Python,微服務,機器學習等領域,歡迎關注,一起學習。

分佈式事務太繁瑣?官方推薦Atomikos,5分鐘幫你搞定 1

前言

最近有個項目,裡面涉及到多個數據源的操作,按照以前的做法採用MQ來做最終一致性,但是又覺得繁瑣些,項目的量能其實也不大很小,想來想去最終採用Atomikos來實現。

XA是啥

在做Atomikos之前,我們先來了解一下什麼是XA。 XA是由X/Open組織提出的分佈式事務的一種協議(或者稱之為分佈式架構)。它主要定義了兩部分的管理器,全局事務管理器及資源管理器。在XA的設計理念中,把不同資源納入到一個事務管理器進行統一管理,例如數據庫資源,消息中間件資源等,從而進行全部資源的事務提交或者取消,目前主流的數據庫,消息中間件都支持XA協議。

分佈式事務太繁瑣?官方推薦Atomikos,5分鐘幫你搞定 2

JTA又是啥

上面講完XA協議,我們來聊聊JTA,JTA叫做Java Transaction API,它是XA協議的JAVA實現。目前在JAVA裡面,關於JTA的定義主要是兩部分

1、事務管理器接口—–javax.transaction.TransactionManager

2、資源管理器接口—–javax.transaction.xa.XAResource

在一般應用採用JTA接口實現事務,需要一個外置的JTA容器來存儲這些事務,像Tomcat。今天我們要講的是Atomikos,它是一個獨立實現了JTA的框架,能夠在我們的應用服務器中運行JTA事務。

接下來我們直接進入到主題,在一個微服務應用中,針對多數據源的時候如何實現分佈式事務。

分佈式事務太繁瑣?官方推薦Atomikos,5分鐘幫你搞定 3

基礎包引入

4.0.0

com.boots
boots
3.0.0.RELEASE

boots-atomikos
boots-atomikos
分布式事务

org.springframework.boot
spring-boot-starter-jta-atomikos

com.boots
module-boots-api
${parent.version}

org.mybatis
mybatis
3.5.4

org.mybatis.spring.boot
mybatis-spring-boot-starter
2.1.2

com.alibaba
druid-spring-boot-starter
1.1.21

mysql
mysql-connector-java

com.baomidou
mybatis-plus-boot-starter
3.3.2

p6spy
p6spy
3.9.0

com.github.ulisesbocchio
jasypt-spring-boot-starter
3.0.2

org.junit.jupiter
junit-jupiter-engine
test

org.junit.platform
junit-platform-launcher
test

配置第一個數據源

/**
* All rights Reserved, Designed By 林溪
* Copyright: Copyright(C) 2016-2020
*/

package com.boots.atomikos.common.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.boots.atomikos.common.constants.AtomikosConstant;
import com.boots.atomikos.common.data.FirstDbData;
import com.boots.atomikos.common.utils.JasyptUtils;
import com.mysql.cj.jdbc.MysqlXADataSource;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* 第一数据源配置
* @author:林溪
* @date:2020年11月19日
*/
@Configuration
@MapperScan(basePackages = AtomikosConstant.FIRST_DAO, sqlSessionFactoryRef = AtomikosConstant.FIRST_SESSIONFACTORY)
@Slf4j
public class FirstDataSourceConfig {

@Autowired
private FirstDbData firstDbData;

/**
* first数据源配置
* @author OprCalf
* @return DataSource
*/
@Bean(AtomikosConstant.FIRST_DATASOURCE)
@Primary
public DataSource firstDataSource() {
final MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(firstDbData.getFirstUrl());
mysqlXaDataSource.setPassword(JasyptUtils.decryptMsg(firstDbData.getJasyptPassword(), firstDbData.getFirstPassword()));
mysqlXaDataSource.setUser(firstDbData.getFirstUsername());
final AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName(AtomikosConstant.FIRST_DATASOURCE);
xaDataSource.setPoolSize(firstDbData.getMaxPoolPreparedStatementPerConnectionSize());
xaDataSource.setMinPoolSize(firstDbData.getMinIdle());
xaDataSource.setMaxPoolSize(firstDbData.getMaxActive());
xaDataSource.setMaxIdleTime(firstDbData.getMinIdle());
xaDataSource.setMaxLifetime(firstDbData.getMinEvictableIdleTimeMillis());
xaDataSource.setConcurrentConnectionValidation(true);
xaDataSource.setTestQuery("select 1 from dual");
log.info("初始化第一数据库成功");
return xaDataSource;
}

/**
* 创建第一个SqlSessionFactory
* @param firstDataSource
* @return
* @throws Exception
*/
@Primary
@Bean(AtomikosConstant.FIRST_SESSIONFACTORY)
@SneakyThrows(Exception.class)
public SqlSessionFactory firstSqlSessionFactory(@Qualifier(AtomikosConstant.FIRST_DATASOURCE) DataSource firstDataSource) {
final MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(firstDataSource);
// 设置mapper位置
bean.setTypeAliasesPackage(AtomikosConstant.FIRST_MODELS);
// 设置mapper.xml文件的路径
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(AtomikosConstant.FIRST_MAPPER));
return bean.getObject();
}

}

配置第二個數據源

/**
* All rights Reserved, Designed By 林溪
* Copyright: Copyright(C) 2016-2020
*/

package com.boots.atomikos.common.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.boots.atomikos.common.constants.AtomikosConstant;
import com.boots.atomikos.common.data.SecondDbData;
import com.boots.atomikos.common.utils.JasyptUtils;
import com.mysql.cj.jdbc.MysqlXADataSource;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* 第二数据源配置
* @author:林溪
* @date:2020年11月19日
*/
@Configuration
@MapperScan(basePackages = AtomikosConstant.SECOND_DAO, sqlSessionFactoryRef = AtomikosConstant.SECOND_SESSIONFACTORY)
@Slf4j
public class SecondDataSourceConfig {

@Autowired
private SecondDbData secondDbData;

/**
* second数据源配置
* @author OprCalf
* @return DataSource
*/
@Bean(AtomikosConstant.SECOND_DATASOURCE)
public DataSource secondDataSource() {
// 使用mysql的分布式驱动,支持MySql5.*、MySql8.* 以上版本
final MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(secondDbData.getSecondUrl());
mysqlXaDataSource.setPassword(JasyptUtils.decryptMsg(secondDbData.getJasyptPassword(), secondDbData.getSecondPassword()));
mysqlXaDataSource.setUser(secondDbData.getSecondUsername());
final AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName(AtomikosConstant.SECOND_DATASOURCE);
xaDataSource.setPoolSize(secondDbData.getMaxPoolPreparedStatementPerConnectionSize());
xaDataSource.setMinPoolSize(secondDbData.getMinIdle());
xaDataSource.setMaxPoolSize(secondDbData.getMaxActive());
xaDataSource.setMaxIdleTime(secondDbData.getMinIdle());
xaDataSource.setMaxLifetime(secondDbData.getMinEvictableIdleTimeMillis());
xaDataSource.setConcurrentConnectionValidation(true);
xaDataSource.setTestQuery("select 1 from dual");
log.info("初始化第二数据库成功");
return xaDataSource;
}

/**
* 创建第一个SqlSessionFactory
* @param secondDataSource
* @return
* @throws Exception
*/
@Bean(AtomikosConstant.SECOND_SESSIONFACTORY)
@SneakyThrows(Exception.class)
public SqlSessionFactory secondSqlSessionFactory(@Qualifier(AtomikosConstant.SECOND_DATASOURCE) DataSource secondDataSource) {
final MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(secondDataSource);
// 设置mapper位置
bean.setTypeAliasesPackage(AtomikosConstant.SECOND_MODELS);
// 设置mapper.xml文件的路径
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(AtomikosConstant.SECOND_MAPPER));
return bean.getObject();
}

}

配置數據源管理器

/**
* All rights Reserved, Designed By 林溪
* Copyright: Copyright(C) 2016-2020
*/

package com.boots.atomikos.common.config;

import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;

import lombok.SneakyThrows;

/**
* Atomikos事务管理器
* @author:林溪
* @date:2020年11月17日
*/
@Configuration
@EnableTransactionManagement
public class AtomikosConfig {

/**
* 初始化JTA事务管理器
* @author 林溪
* @return UserTransaction
*/
@Bean(name = "userTransaction")
@SneakyThrows(Exception.class)
public UserTransaction userTransaction() {
final UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(20000);
return userTransactionImp;
}

/**
* 初始化Atomikos事务管理器
* @author 林溪
* @return TransactionManager
*/
@Bean(name = "atomikosTransactionManager")
@SneakyThrows(Exception.class)
public TransactionManager atomikosTransactionManager() {
final UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}

/**
* 加载事务管理
* @author 林溪
* @param atomikosTransactionManager
* @param userTransaction
* @return PlatformTransactionManager
*/
@Bean(name = "transactionManager")
@SneakyThrows(Throwable.class)
public PlatformTransactionManager transactionManager(@Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) {
return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
}

}

配置常量

/**
* All rights Reserved, Designed By 林溪
* Copyright: Copyright(C) 2016-2020
*/

package com.boots.atomikos.common.constants;

/**
* 分布式事务常量
* @author:林溪
* @date:2020年11月16日
*/

public class AtomikosConstant {

/*****************第一数据库配置****************************/

// 数据源配置
public final static String FIRST_DATASOURCE = "firstDataSource";

// 会话工厂配置
public final static String FIRST_SESSIONFACTORY = "firstSessionFactory";

// 映射接口配置
public final static String FIRST_DAO= "com.boots.atomikos.business.afuser.dao";

// 数据对象路径
public final static String FIRST_MODELS = "com.boots.atomikos.business.afuser.model";

// 映射目录配置
public final static String FIRST_MAPPER = "classpath:mappers/AfUserMapper.xml";

/*****************第二数据库配置****************************/

// 数据源配置
public final static String SECOND_DATASOURCE = "secondDataSource";

// 会话工厂配置
public final static String SECOND_SESSIONFACTORY = "secondSessionFactory";

// 映射接口配置
public final static String SECOND_DAO= "com.boots.atomikos.business.afcustomer.dao";

// 数据对象路径
public final static String SECOND_MODELS = "com.boots.atomikos.business.afcustomer.model";

// 映射目录配置
public final static String SECOND_MAPPER = "classpath:mappers/AfCustomerMapper.xml";

}

配置信息

######配置基本信息######
##配置应用名称
spring.application.name: boots-atomikos
##配置时间格式,为了避免精度丢失,全部换成字符串
spring.jackson.timeZone: GMT+8
spring.jackson.dateFormat: yyyy-MM-dd HH:mm:ss
spring.jackson.generator.writeNumbersAsStrings: true
#####配置数据源#######
first.datasource.url: jdbc:mysql://127.0.0.1:3306/atomikos_first?autoReconnect=true&useSSL=false&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true
first.datasource.username: root
first.datasource.password: yiOtQ2YkCWwOvRNmLI4eaPG/fx/q3AIB20JFFz87T96+udBorAm0tNxI2YKfFdeA
#####配置数据源#######
second.datasource.url: jdbc:mysql://127.0.0.1:3306/atomikos_second?autoReconnect=true&useSSL=false&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true
second.datasource.username: root
second.datasource.password: yiOtQ2YkCWwOvRNmLI4eaPG/fx/q3AIB20JFFz87T96+udBorAm0tNxI2YKfFdeA

運行測試

我們定義了一個接口並實現該接口,定義了一個test方法,根據不同情況手動拋出異常,在運行後可以直接看到數據並沒有被插入到數據中

/**
* All rights Reserved, Designed By 林溪开源
* Copyright: Copyright(C) 2016-2020
* Company 林溪开源 Ltd.
*/

package com.boots.atomikos.business.afcustomer.service.impl;

import javax.transaction.Transactional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.boots.atomikos.business.afcustomer.dao.IAfCustomerDao;
import com.boots.atomikos.business.afcustomer.model.AfCustomer;
import com.boots.atomikos.business.afcustomer.service.IAfCustomerService;
import com.boots.atomikos.business.afuser.dao.IAfUserDao;
import com.boots.atomikos.business.afuser.model.AfUser;
import com.module.boots.exception.CommonRuntimeException;

/**
* 客户表逻辑服务实现层
* @author:林溪
* @date: 2020年11月17日
*/
@Service
public class AfCustomerServiceImpl extends ServiceImpl implements IAfCustomerService {

@Autowired
private IAfCustomerDao afCustomerDao;

@Autowired
private IAfUserDao afUserDao;

@Override
@Transactional(rollbackOn = CommonRuntimeException.class)
public void test() {
final AfCustomer afCustomer = AfCustomer.builder().customerName("客户1").build();
final AfUser afUser = AfUser.builder().userName("用户1").build();
final int i = afCustomerDao.insert(afCustomer);
if (i > 0) {
throw new CommonRuntimeException("新增失败");
}
afUserDao.insert(afUser);
}

}

總結

實驗結果測試沒問題,這裡就不貼出來,有興趣的同學可以通過以下獲取源碼

h ttps://gitee.com/lemeno/boots

– 結束 –

作者:@互聯網應用架構

原創作品,抄襲必究

如需要源碼,轉發,關注後私信我

部分圖片或代碼來源網絡,如侵權請聯繫刪除,謝謝!