多数据源切换下保证事务解决方案

    xiaoxiao2022-07-03  208

    多数据源下要保证事务,实际上就是分布式事务,现在已经有阿里开源的seata来实现分布式事务了,不用自己造轮子,如果想自己实现,下边是一套方案.

    我的项目是基于mybatis-plus实现的,在因为mubatis-plus只是在mybatis上面做了封装,这套方案用于myabtis也是没问题的

    如果只是重写了AbstractRoutingDataSource方法,那么在事务下数据源是切换不了的,还需要重写事务方法

    1. 先实现AbstractRoutingDataSource类

    public class DynamicDataSource extends AbstractRoutingDataSource { /** * 取得当前使用哪个数据源 * @return */ @Override protected Object determineCurrentLookupKey() { return DbContextHolder.getDbType(); } public DataSource getAcuallyDataSource() { Object lookupKey = determineCurrentLookupKey(); if(null == lookupKey) { return this; } DataSource determineTargetDataSource = this.determineTargetDataSource(); return determineTargetDataSource==null ? this : determineTargetDataSource; } }

    2. DbContextHolder 用于获取当前线程绑定的数据源

    public class DbContextHolder { private static final ThreadLocal contextHolder = new ThreadLocal<>(); /** * 设置数据源 * @param dbTypeEnum */ public static void setDbType(DBTypeEnum dbTypeEnum) { contextHolder.set(dbTypeEnum.getValue()); } /** * 取得当前数据源 * @return */ public static String getDbType() { return (String) contextHolder.get(); } /** * 清除上下文数据 */ public static void clearDbType() { contextHolder.remove(); } }

    3. 数据源切换注解/切面/数据源枚举类

    @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface DataSourceSwitch { DBTypeEnum value() default DBTypeEnum.db1; } @Component @Aspect @Order(-100) @Slf4j public class DataSourceSwitchAspect { @Pointcut("execution(* com.polycis.main.service.db1..*.*(..))") private void db1Aspect() { } @Pointcut("execution(* com.polycis.main.service.db2..*.*(..))") private void db2Aspect() { } @Pointcut("execution(* com.polycis.main.service.db3..*.*(..))") private void db3Aspect() { } @Before( "db1Aspect()" ) public void db1(JoinPoint joinPoint) { log.info("DataSourceSwitchAspect切换到db1 数据源..."); setDataSource(joinPoint,DBTypeEnum.db1); } @Before("db2Aspect()" ) public void db2 (JoinPoint joinPoint) { log.info("DataSourceSwitchAspect切换到db2 数据源..."); setDataSource(joinPoint,DBTypeEnum.db2); } @Before("db3Aspect()" ) public void db3 (JoinPoint joinPoint) { log.info("DataSourceSwitchAspect切换到db3 数据源..."); setDataSource(joinPoint,DBTypeEnum.db3); } @After( "db1Aspect()" ) public void db11(JoinPoint joinPoint) { log.info("清除数据源db1"); clearDbType(); } @After("db2Aspect()" ) public void db22 (JoinPoint joinPoint) { log.info("清除数据源db2"); clearDbType(); } @After("db3Aspect()" ) public void db33 (JoinPoint joinPoint) { log.info("清除数据源db3"); clearDbType(); } /** * 添加注解方式,如果有注解优先注解,没有则按传过来的数据源配置 * @param joinPoint * @param dbTypeEnum */ private void setDataSource(JoinPoint joinPoint, DBTypeEnum dbTypeEnum) { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); DataSourceSwitch dataSourceSwitch = methodSignature.getMethod().getAnnotation(DataSourceSwitch.class); if (Objects.isNull(dataSourceSwitch) || Objects.isNull(dataSourceSwitch.value())) { DbContextHolder.setDbType(dbTypeEnum); }else{ log.info("根据注解来切换数据源,注解值为:"+dataSourceSwitch.value()); switch (dataSourceSwitch.value().getValue()) { case "db1": DbContextHolder.setDbType(DBTypeEnum.db1); break; case "db2": DbContextHolder.setDbType(DBTypeEnum.db2); break; case "db3": DbContextHolder.setDbType(DBTypeEnum.db3); break; default: DbContextHolder.setDbType(dbTypeEnum); } } } } public enum DBTypeEnum { db1("db1"), db2("db2"),db3("db3");; private String value; DBTypeEnum(String value) { this.value = value; } public String getValue() { return value; } }

    4. mybatis配置类

    在sqlSessionFactory里,将动态数据源map设置进去.

    然后关键的来了,

    sqlSessionFactory.setTransactionFactory(new MultiDataSourceTransactionFactory());

    这里要设置MultiDataSourceTransactionFactory.

    @Configuration @MapperScan({"com.polycis.main.mapper*"}) public class MybatisPlusConfig { /** * mybatis-plus分页插件<br> * 文档:http://mp.baomidou.com<br> */ @Bean public PaginationInterceptor paginationInterceptor() { PaginationInterceptor paginationInterceptor = new PaginationInterceptor(); //paginationInterceptor.setLocalPage(true);// 开启 PageHelper 的支持 return paginationInterceptor; } /** * mybatis-plus SQL执行效率插件【生产环境可以关闭】 *//* @Bean public PerformanceInterceptor performanceInterceptor() { return new PerformanceInterceptor(); } */ @Bean(name = "db1") @ConfigurationProperties(prefix = "spring.datasource.druid.db1" ) public DataSource db1 () { return DruidDataSourceBuilder.create().build(); } @Bean(name = "db2") @ConfigurationProperties(prefix = "spring.datasource.druid.db2" ) public DataSource db2 () { return DruidDataSourceBuilder.create().build(); } @Bean(name = "db3") @ConfigurationProperties(prefix = "spring.datasource.druid.db3" ) public DataSource db3 () { return DruidDataSourceBuilder.create().build(); } /** * 动态数据源配置 * @return */ @Bean @Primary public DataSource multipleDataSource (@Qualifier("db1") DataSource db1, @Qualifier("db2") DataSource db2, @Qualifier("db3") DataSource db3) { DynamicDataSource dynamicDataSource = new DynamicDataSource(); Map< Object, Object > targetDataSources = new HashMap<>(); targetDataSources.put(DBTypeEnum.db1.getValue(), db1 ); targetDataSources.put(DBTypeEnum.db2.getValue(), db2); targetDataSources.put(DBTypeEnum.db3.getValue(), db3); dynamicDataSource.setTargetDataSources(targetDataSources); dynamicDataSource.setDefaultTargetDataSource(db1); return dynamicDataSource; } @Bean("sqlSessionFactory") public SqlSessionFactory sqlSessionFactory() throws Exception { MybatisSqlSessionFactoryBean sqlSessionFactory = new MybatisSqlSessionFactoryBean(); sqlSessionFactory.setDataSource(multipleDataSource(db1(),db2(),db3())); sqlSessionFactory.setTransactionFactory(new MultiDataSourceTransactionFactory()); //sqlSessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/*/*Mapper.xml")); MybatisConfiguration configuration = new MybatisConfiguration(); //configuration.setDefaultScriptingLanguage(MybatisXMLLanguageDriver.class); configuration.setJdbcTypeForNull(JdbcType.NULL); configuration.setMapUnderscoreToCamelCase(true); configuration.setCacheEnabled(false); sqlSessionFactory.setConfiguration(configuration); sqlSessionFactory.setPlugins(new Interceptor[]{ //PerformanceInterceptor(),OptimisticLockerInterceptor() paginationInterceptor() }); sqlSessionFactory.setGlobalConfig(globalConfiguration()); return sqlSessionFactory.getObject(); } @Bean public GlobalConfiguration globalConfiguration() { GlobalConfiguration conf = new GlobalConfiguration(new LogicSqlInjector()); conf.setLogicDeleteValue("-1"); conf.setLogicNotDeleteValue("1"); conf.setIdType(0); conf.setMetaObjectHandler(new MyMetaObjectHandler()); conf.setDbColumnUnderline(true); conf.setRefresh(true); return conf; } }

    5. 事务工厂类

    public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory { @Override public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) { return new MultiDataSourceTransaction(dataSource); } }

    6. 多数据源事务接口,重写了getConnection,commit等接口的逻辑

    public class MultiDataSourceTransaction implements Transaction{ private static final Log LOGGER = LogFactory.getLog(MultiDataSourceTransaction.class); private final DataSource dataSource; private Connection mainConnection; private String mainDatabaseIdentification; private ConcurrentMap<String, Connection> otherConnectionMap; private boolean isConnectionTransactional; private boolean autoCommit; public MultiDataSourceTransaction(DataSource dataSource) { notNull(dataSource, "No DataSource specified"); this.dataSource = dataSource; otherConnectionMap = new ConcurrentHashMap<>(); mainDatabaseIdentification= DbContextHolder.getDbType(); } /** * {@inheritDoc} */ @Override public Connection getConnection() throws SQLException { String databaseIdentification = DbContextHolder.getDbType(); if (databaseIdentification.equals(mainDatabaseIdentification)) { if (mainConnection != null){ return mainConnection; } else { openMainConnection(); mainDatabaseIdentification =databaseIdentification; return mainConnection; } } else { if (!otherConnectionMap.containsKey(databaseIdentification)) { try { Connection conn = dataSource.getConnection(); otherConnectionMap.put(databaseIdentification, conn); } catch (SQLException ex) { throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex); } } return otherConnectionMap.get(databaseIdentification); } } private void openMainConnection() throws SQLException { this.mainConnection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.mainConnection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource); if (LOGGER.isDebugEnabled()) { LOGGER.debug( "JDBC Connection [" + this.mainConnection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); } } /** * {@inheritDoc} */ @Override public void commit() throws SQLException { if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Committing JDBC Connection [" + this.mainConnection + "]"); } this.mainConnection.commit(); for (Connection connection : otherConnectionMap.values()) { connection.commit(); } } } /** * {@inheritDoc} */ @Override public void rollback() throws SQLException { if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Rolling back JDBC Connection [" + this.mainConnection + "]"); } this.mainConnection.rollback(); for (Connection connection : otherConnectionMap.values()) { connection.rollback(); } } } /** * {@inheritDoc} */ @Override public void close() throws SQLException { DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource); for (Connection connection : otherConnectionMap.values()) { DataSourceUtils.releaseConnection(connection, this.dataSource); } } @Override public Integer getTimeout() throws SQLException { return null; } }

     

    最新回复(0)