暗中观察

Spring之Transaction底层源码解析
一、spring中使用事务的2种方式spring使事务操作变的异常容易了,spring中控制事务主要有2种方式编程...
扫描右侧二维码阅读全文
19
2023/04

Spring之Transaction底层源码解析

一、spring中使用事务的2种方式

spring使事务操作变的异常容易了,spring中控制事务主要有2种方式

编程式事务:硬编码的方式
声明式事务:大家比较熟悉的注解@Transaction的方式

1.1 编程式事务

什么是编程式事务?
通过硬编码的方式使用spring中提供的事务相关的类来控制事务。

编程式事务主要有2种用法
方式1:通过PlatformTransactionManager控制事务
方式2:通过TransactionTemplate控制事务

方式1:PlatformTransactionManager

//定义一个JdbcTemplate,用来方便执行数据库增删改查
    JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
    //1.定义事务管理器,给其指定一个数据源(可以把事务管理器想象为一个人,这个人来负责事务的控制操作)
    PlatformTransactionManager platformTransactionManager = new DataSourceTransactionManager(dataSource);
    //2.定义事务属性:TransactionDefinition,TransactionDefinition可以用来配置事务的属性信息,比如事务隔离级别、事务超时时间、事务传播方式、是否是只读事务等等。
    TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
    //3.开启事务:调用platformTransactionManager.getTransaction开启事务操作,得到事务状态(TransactionStatus)对象
    TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
    //4.执行业务操作,下面就执行2个插入操作
    try {
        System.out.println("before:" + jdbcTemplate.queryForList("SELECT * from t_user"));
        jdbcTemplate.update("insert into t_user (name) values (?)", "test1-1");
        jdbcTemplate.update("insert into t_user (name) values (?)", "test1-2");
        //5.提交事务:platformTransactionManager.commit
        platformTransactionManager.commit(transactionStatus);
    } catch (Exception e) {
        //6.回滚事务:platformTransactionManager.rollback
        platformTransactionManager.rollback(transactionStatus);
    }

代码分析
代码中主要有5个步骤

步骤1:定义事务管理器PlatformTransactionManager

事务管理器相当于一个管理员,这个管理员就是用来帮你控制事务的,比如开启事务,提交事务,回滚事务等等。

spring中使用PlatformTransactionManager这个接口来表示事务管理器,

public interface PlatformTransactionManager {
    //获取一个事务(开启事务)
    TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException;
    //提交事务
    void commit(TransactionStatus status) throws TransactionException;
    //回滚事务
    void rollback(TransactionStatus status) throws TransactionException;
}

步骤2:定义事务属性TransactionDefinition

定义事务属性,比如事务隔离级别、事务超时时间、事务传播方式、是否是只读事务等等。

spring中使用TransactionDefinition接口来表示事务的定义信息,有个子类比较常用:DefaultTransactionDefinition。

步骤3:开启事务

调用事务管理器的getTransaction方法,即可以开启一个事务

TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);

这个方法会返回一个TransactionStatus表示事务状态的一个对象,通过TransactionStatus提供的一些方法可以用来控制事务的一些状态,比如事务最终是需要回滚还是需要提交。

执行了getTransaction后,spring内部会执行一些操作,为了方便大家理解,咱们看看伪代码:

//有一个全局共享的threadLocal对象 resources
static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
//获取一个db的连接
DataSource datasource = platformTransactionManager.getDataSource();
Connection connection = datasource.getConnection();
//设置手动提交事务
connection.setAutoCommit(false);
Map<Object, Object> map = new HashMap<>();
map.put(datasource,connection);
resources.set(map);

上面代码,将数据源datasource和connection映射起来放在了ThreadLocal中,ThreadLocal大家应该比较熟悉,用于在同一个线程中共享数据;后面我们可以通过resources这个ThreadLocal获取datasource其对应的connection对象。

步骤4:执行业务操作

我们使用jdbcTemplate插入了2条记录。

jdbcTemplate.update("insert into t_user (name) values (?)", "test1-1");
jdbcTemplate.update("insert into t_user (name) values (?)", "test1-2");

大家看一下创建JdbcTemplate的代码,需要指定一个datasource

JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

再来看看创建事务管理器的代码

PlatformTransactionManager platformTransactionManager = new DataSourceTransactionManager(dataSource);
2者用到的是同一个dataSource,而事务管理器开启事务的时候,会创建一个连接,将datasource和connection映射之后丢在了ThreadLocal中,而JdbcTemplate内部执行db操作的时候,也需要获取连接,JdbcTemplate会以自己内部的datasource去上面的threadlocal中找有没有关联的连接,如果有直接拿来用,若没找到将重新创建一个连接,而此时是可以找到的,那么JdbcTemplate就参与到spring的事务中了。

步骤5:提交 or 回滚

//5.提交事务:platformTransactionManager.commit
platformTransactionManager.commit(transactionStatus);
//6.回滚事务:platformTransactionManager.rollback
platformTransactionManager.rollback(transactionStatus);

方式2:TransactionTemplate

方式1中部分代码是可以重用的,所以spring对其进行了优化,采用模板方法模式就其进行封装,主要省去了提交或者回滚事务的代码。

//定义一个JdbcTemplate,用来方便执行数据库增删改查
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
//1.定义事务管理器,给其指定一个数据源(可以把事务管理器想象为一个人,这个人来负责事务的控制操作)
PlatformTransactionManager platformTransactionManager = new DataSourceTransactionManager(dataSource);
//2.定义事务属性:TransactionDefinition,TransactionDefinition可以用来配置事务的属性信息,比如事务隔离级别、事务超时时间、事务传播方式、是否是只读事务等等。
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
transactionDefinition.setTimeout(10);//如:设置超时时间10s
//3.创建TransactionTemplate对象
TransactionTemplate transactionTemplate = new TransactionTemplate(platformTransactionManager, transactionDefinition);
/**
 * 4.通过TransactionTemplate提供的方法执行业务操作
 * 主要有2个方法:
 * (1).executeWithoutResult(Consumer<TransactionStatus> action):没有返回值的,需传递一个Consumer对象,在accept方法中做业务操作
 * (2).<T> T execute(TransactionCallback<T> action):有返回值的,需要传递一个TransactionCallback对象,在doInTransaction方法中做业务操作
 * 调用execute方法或者executeWithoutResult方法执行完毕之后,事务管理器会自动提交事务或者回滚事务。
 * 那么什么时候事务会回滚,有2种方式:
 * (1)transactionStatus.setRollbackOnly();将事务状态标注为回滚状态
 * (2)execute方法或者executeWithoutResult方法内部抛出异常
 * 什么时候事务会提交?
 * 方法没有异常 && 未调用过transactionStatus.setRollbackOnly();
 */
transactionTemplate.executeWithoutResult(new Consumer<TransactionStatus>() {
    @Override
    public void accept(TransactionStatus transactionStatus) {
        jdbcTemplate.update("insert into t_user (name) values (?)", "transactionTemplate-1");
        jdbcTemplate.update("insert into t_user (name) values (?)", "transactionTemplate-2");
    }
});

代码分析
TransactionTemplate,主要有2个方法:
executeWithoutResult:无返回值场景

executeWithoutResult(Consumer<TransactionStatus> action):没有返回值的,需传递一个Consumer对象,在accept方法中做业务操作

execute:有返回值场景

<T> T execute(TransactionCallback<T> action):有返回值的,需要传递一个TransactionCallback对象,在doInTransaction方法中做业务操作

什么时候事务会回滚,有2种方式

方式1
在execute或者executeWithoutResult内部执行transactionStatus.setRollbackOnly();将事务状态标注为回滚状态,spring会自动让事务回滚

方式2
execute方法或者executeWithoutResult方法内部抛出任意异常即可回滚。

什么时候事务会提交?
方法没有异常 && 未调用过transactionStatus.setRollbackOnly();

编程式事务正确的使用姿势

先来个配置类,将事务管理器PlatformTransactionManager、事务模板TransactionTemplate都注册到spring中,重用。

@Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }
    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }

通常我们会将业务操作放在service中,

   @Autowired
    private TransactionTemplate transactionTemplate;
    //模拟业务操作1
    public void bus1() {
        this.transactionTemplate.executeWithoutResult(transactionStatus -> {
            //先删除表数据
            this.jdbcTemplate.update("delete from t_user");
            //调用bus2
            this.bus2();
        });
    }

1.2 声明式事务

所谓声明式事务,就是通过配置的方式,比如通过配置文件(xml)或者注解的方式,告诉spring,哪些方法需要spring帮忙管理事务,然后开发者只用关注业务代码,而事务的事情spring自动帮我们控制。

比如注解的方式,只需在方法上面加一个@Transaction注解,那么方法执行之前spring会自动开启一个事务,方法执行完毕之后,会自动提交或者回滚事务,而方法内部没有任何事务相关代码,用起来特别的方法。

声明式事务的2种实现方式
a.配置文件的方式,即在spring xml文件中进行统一配置,开发者基本上就不用关注事务的事情了,代码中无需关心任何和事务相关的代码,一切交给spring处理。
b.注解的方式,只需在需要spring来帮忙管理事务的方法上加上@Transaction注解就可以了,注解的方式相对来说更简洁一些,都需要开发者自己去进行配置,可能有些同学对spring不是太熟悉,所以配置这个有一定的风险,做好代码review就可以了。

声明式事务注解方式5个步骤
1、启用Spring的注释驱动事务管理功能
在spring配置类上加上@EnableTransactionManagement注解

@EnableTransactionManagement
public class MainConfig4 {
}

简要介绍一下原理:当spring容器启动的时候,发现有@EnableTransactionManagement注解,此时会拦截所有bean的创建,扫描看一下bean上是否有@Transaction注解(类、或者父类、或者接口、或者方法中有这个注解都可以),如果有这个注解,spring会通过aop的方式给bean生成代理对象,代理对象中会增加一个拦截器,拦截器会拦截bean中public方法执行,会在方法执行之前启动事务,方法执行完毕之后提交或者回滚事务。稍后会专门有一篇文章带大家看这块的源码。

如果有兴趣的可以自己先去读一下源码,主要是下面这个这方法会

org.springframework.transaction.interceptor.TransactionInterceptor#invoke

再来看看 EnableTransactionManagement 的源码

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
    /**
     * spring是通过aop的方式对bean创建代理对象来实现事务管理的
     * 创建代理对象有2种方式,jdk动态代理和cglib代理
     * proxyTargetClass:为true的时候,就是强制使用cglib来创建代理
     */
    boolean proxyTargetClass() default false;
    /**
     * 用来指定事务拦截器的顺序
     * 我们知道一个方法上可以添加很多拦截器,拦截器是可以指定顺序的
     * 比如你可以自定义一些拦截器,放在事务拦截器之前或者之后执行,就可以通过order来控制
     */
    int order() default Ordered.LOWEST_PRECEDENCE;
}

2、定义事务管理器
事务交给spring管理,那么你肯定要创建一个或者多个事务管理者,有这些管理者来管理具体的事务,比如启动事务、提交事务、回滚事务,这些都是管理者来负责的。

spring中使用PlatformTransactionManager这个接口来表示事务管理者。

PlatformTransactionManager多个实现类,用来应对不同的环境
transaction.png

JpaTransactionManager:如果你用jpa来操作db,那么需要用这个管理器来帮你控制事务。

DataSourceTransactionManager:如果你用是指定数据源的方式,比如操作数据库用的是:JdbcTemplate、mybatis、ibatis,那么需要用这个管理器来帮你控制事务。

HibernateTransactionManager:如果你用hibernate来操作db,那么需要用这个管理器来帮你控制事务。

JtaTransactionManager:如果你用的是java中的jta来操作db,这种通常是分布式事务,此时需要用这种管理器来控制事务。

比如:我们用的是mybatis或者jdbctemplate,那么通过下面方式定义一个事务管理器。

@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource);
}

3、需使用事务的目标上加@Transaction注解
@Transaction放在接口上,那么接口的实现类中所有public都被spring自动加上事务
@Transaction放在类上,那么当前类以及其下无限级子类中所有pubilc方法将被spring自动加上事务
@Transaction放在public方法上,那么该方法将被spring自动加上事务
注意:@Transaction只对public方法有效
下面我们看一下@Transactional源码:

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Transactional {
    /**
     * 指定事务管理器的bean名称,如果容器中有多事务管理器PlatformTransactionManager,
     * 那么你得告诉spring,当前配置需要使用哪个事务管理器
     */
    @AliasFor("transactionManager")
    String value() default "";
    /**
     * 同value,value和transactionManager选配一个就行,也可以为空,如果为空,默认会从容器中按照类型查找一个事务管理器bean
     */
    @AliasFor("value")
    String transactionManager() default "";
    /**
     * 事务的传播属性
     */
    Propagation propagation() default Propagation.REQUIRED;
    /**
     * 事务的隔离级别,就是制定数据库的隔离级别,数据库隔离级别大家知道么?不知道的可以去补一下
     */
    Isolation isolation() default Isolation.DEFAULT;
    /**
     * 事务执行的超时时间(秒),执行一个方法,比如有问题,那我不可能等你一天吧,可能最多我只能等你10秒
     * 10秒后,还没有执行完毕,就弹出一个超时异常吧
     */
    int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
    /**
     * 是否是只读事务,比如某个方法中只有查询操作,我们可以指定事务是只读的
     * 设置了这个参数,可能数据库会做一些性能优化,提升查询速度
     */
    boolean readOnly() default false;
    /**
     * 定义零(0)个或更多异常类,这些异常类必须是Throwable的子类,当方法抛出这些异常及其子类异常的时候,spring会让事务回滚
     * 如果不配做,那么默认会在 RuntimeException 或者 Error 情况下,事务才会回滚 
     */
    Class<? extends Throwable>[] rollbackFor() default {};
    /**
     * 和 rollbackFor 作用一样,只是这个地方使用的是类名
     */
    String[] rollbackForClassName() default {};
    /**
     * 定义零(0)个或更多异常类,这些异常类必须是Throwable的子类,当方法抛出这些异常的时候,事务不会回滚
     */
    Class<? extends Throwable>[] noRollbackFor() default {};
    /**
     * 和 noRollbackFor 作用一样,只是这个地方使用的是类名
     */
    String[] noRollbackForClassName() default {};
}

参数介绍
transaction1.png

4、执行db业务操作
在@Transaction标注类或者目标方法上执行业务操作,此时这些方法会自动被spring进行事务管理。

如,下面的insertBatch操作,先删除数据,然后批量插入数据,方法上加上了@Transactional注解,此时这个方法会自动受spring事务控制,要么都成功,要么都失败。

@Component
public class UserService {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    //先清空表中数据,然后批量插入数据,要么都成功要么都失败
    @Transactional
    public void insertBatch(String... names) {
        jdbcTemplate.update("truncate table t_user");
        for (String name : names) {
            jdbcTemplate.update("INSERT INTO t_user(name) VALUES (?)", name);
        }
    }
}

5、启动spring容器,使用bean执行业务操作

@Test
public void test1() {
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
    context.register(MainConfig4.class);
    context.refresh();
    UserService userService = context.getBean(UserService.class);
    userService.insertBatch("java高并发系列", "mysql系列", "maven系列", "mybatis系列");
}

二、Spring编程式事务源码深度解析,理解spring事务的本质

1、定义事务属性信息(TransactionDefinition)

事务启动的过程中需要定义事务的一些配置信息,如:事务传播行为、隔离级别、超时时间、是否是只读事务、事务名称,spring中使用TransactionDefinition接口表示事务定义信息,下面看一下TransactionDefinition接口源码,主要有5个信息

事务传播行为
事务隔离级别
事务超时时间
是否是只读事务
事务名称

TransactionDefinition接口的实现类,比较多,我们重点关注用的比较多的2个

DefaultTransactionDefinition:TransactionDefinition接口的默认的一个实现,编程式事务中通常可以使用这个
RuleBasedTransactionAttribute:声明式事务中用到的是这个,这个里面对于事务回滚有一些动态匹配的规则,稍后在声明式事务中去讲。

2、定义事务管理器(PlatformTransactionManager)

事务管理器,这是个非常重要的角色,可以把这货想象为一个人,spring就是靠这个人来管理事务的,负责:获取事务、提交事务、回滚事务,Spring中用PlatformTransactionManager接口表示事务管理器,接口中有三个方法

public interface PlatformTransactionManager {
    //通过事务管理器获取一个事务,返回TransactionStatus:内部包含事务的状态信息
    TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
    //根据事务的状态信息提交事务
    void commit(TransactionStatus status) throws TransactionException;
    //根据事务的状态信息回滚事务
    void rollback(TransactionStatus status) throws TransactionException;
}

PlatformTransactionManager有多个实现类,用来应对不同的环境,比如你操作db用的是hibernate或者mybatis,那么用到的事务管理器是不一样的,常见的事务管理器实现有下面几个
JpaTransactionManager:如果你用jpa来操作db,那么需要用这个管理器来帮你控制事务。

DataSourceTransactionManager:如果你用是指定数据源的方式,比如操作数据库用的是:JdbcTemplate、mybatis、ibatis,那么需要用这个管理器来帮你控制事务。

HibernateTransactionManager:如果你用hibernate来操作db,那么需要用这个管理器来帮你控制事务。

JtaTransactionManager:如果你用的是java中的jta来操作db,这种通常是分布式事务,此时需要用这种管理器来控制事务。

3、获取事务

下面源码我们以REQUIRED中嵌套一个REQUIRED_NEW来进行说明,也就是事务中嵌套一个新的事务。

3.1、getTransaction:获取事务

通过事务管理器的getTransactiongetTransaction(transactionDefinition)方法开启事务,传递一个TransactionDefinition参数

TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);

事务管理器我们用的是DataSourceTransactionManager,下面我们看一下DataSourceTransactionManager.getTransaction源码

org.springframework.jdbc.datasource.DataSourceTransactionManager
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    //事务定义信息,若传入的definition如果为空,取默认的
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    //@3.1-1:获取事务对象
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    //@3.1-2:当前是否存在事务
    if (isExistingTransaction(transaction)) {
        //@1-3:如果当前存在事务,走这里
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // 当前没有事务,走下面的代码
    // 若事务传播级别是PROPAGATION_MANDATORY:要求必须存在事务,若当前没有事务,弹出异常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        //事务传播行为(PROPAGATION_REQUIRED|PROPAGATION_REQUIRES_NEW|PROPAGATION_NESTED)走这里
        //@3.1-4:挂起事务
        SuspendedResourcesHolder suspendedResources = suspend(null);
        try {
            //@3.1-5:是否开启新的事务同步
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            //@3.1-6:创建事务状态对象DefaultTransactionStatus,DefaultTransactionStatus是TransactionStatus的默认实现
            DefaultTransactionStatus status = newTransactionStatus(def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //@3.1-7:doBegin用于开始事务
            doBegin(transaction, def);
            //@3.1-8:准备事务同步
            prepareSynchronization(status, def);
            //@3.1-9:返回事务状态对象
            return status;
        } catch (RuntimeException | Error ex) {
            //@3.1-10:出现(RuntimeException|Error)恢复被挂起的事务
            resume(null, suspendedResources);
            throw ex;
        }
    } else {
        //@3.1-11:其他事务传播行为的走这里(PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED、PROPAGATION_NEVER)
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

3.2、doGetTransaction:获取事务对象

org.springframework.jdbc.datasource.DataSourceTransactionManager
protected Object doGetTransaction() {
    //@3.2-1:创建数据源事务对象
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    //@3.2-2:是否支持内部事务
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    //@3.2-4:ConnectionHolder表示jdbc连接持有者,简单理解:数据的连接被丢到ConnectionHolder中了,ConnectionHolder中提供了一些方法来返回里面的连接,此处调用TransactionSynchronizationManager.getResource方法来获取ConnectionHolder对象
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    //@3.2-5:将conHolder丢到DataSourceTransactionObject中,第二个参数表示是否是一个新的连接,明显不是的吗,新的连接需要通过datasource来获取,通过datasource获取的连接才是新的连接
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

下面来看一下@3.2-4的代码,这个是重点了,这个用到了一个新的类TransactionSynchronizationManager:事务同步管理器,什么叫同步?一个事务过程中,被调用的方法都在一个线程中串行执行,就是同步;这个类中用到了很多ThreadLocal,用来在线程中存储事务相关的一些信息,,来瞅一眼

public abstract class TransactionSynchronizationManager {
    //存储事务资源信息
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");
    //存储事务过程中的一些回调接口(TransactionSynchronization接口,这个可以在事务的过程中给开发者提供一些回调用的)
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<>("Transaction synchronizations");
    //存储当前正在运行的事务的名称
    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<>("Current transaction name");
    //存储当前正在运行的事务是否是只读的
    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<>("Current transaction read-only status");
    //存储当前正在运行的事务的隔离级别
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<>("Current transaction isolation level");
    //存储当前正在运行的事务是否是活动状态,事务启动的时候会被激活
    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<>("Actual transaction active");
    //还有很多静态方法,主要是用来操作上面这些ThreadLocal的,这里就不列出的,大家可以去看看
}

下面来看TransactionSynchronizationManager.getResource代码

ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
obtainDataSource()会返回事务管理器中的datasource对象

protected DataSource obtainDataSource() {
    DataSource dataSource = getDataSource();
    Assert.state(dataSource != null, "No DataSource set");
    return dataSource;
}

下面看TransactionSynchronizationManager.getResource的源码

public static Object getResource(Object key) {
    //通过TransactionSynchronizationUtils.unwrapResourceIfNecessary(key)获取一个actualKey,我们传入的是datasouce,实际上最后actualKey和传入的datasource是一个对象
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    //调用doGetResource方法获取对应的value
    Object value = doGetResource(actualKey);
    return value;
}

doGetResource(actualKey)方法如下,内部会从resources这个ThreadLocal中获取获取数据ConnectionHolder对象,到目前为止,根本没有看到向resource中放入过数据,获取获取到的conHolder肯定是null了

static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
private static Object doGetResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
    if (map == null) {
        return null;
    }
    Object value = map.get(actualKey);
    return value;
}

TransactionSynchronizationManager.getResource:可以理解为从resource ThreadLocal中查找transactionManager.datasource绑定的ConnectionHolder对象

到此,Object transaction = doGetTransaction()方法执行完毕,下面我们再回到getTransaction方法,第一次进来,上下文中是没有事务的,所以会走下面的@3.1-4的代码,当前没有事务,导致没有事务需要挂起,所以suspend方法内部可以先忽略

org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {
    //事务定义信息,若传入的definition如果为空,取默认的
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    //@3.1-1:获取事务对象
    Object transaction = doGetTransaction();
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        //事务传播行为(PROPAGATION_REQUIRED|PROPAGATION_REQUIRES_NEW|PROPAGATION_NESTED)走这里
        //@3.1-4:挂起事务
        SuspendedResourcesHolder suspendedResources = suspend(null);
        try {
            //@3.1-5:是否开启新的事务同步
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            //@3.1-6:创建事务状态对象DefaultTransactionStatus,DefaultTransactionStatus是TransactionStatus的默认实现
            DefaultTransactionStatus status = newTransactionStatus(
                    def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //@3.1-7:doBegin用于开始事务
            doBegin(transaction, def);
            //@3.1-8:准备事务同步
            prepareSynchronization(status, def);
            //@3.1-9:返回事务状态对象
            return status;
        } catch (RuntimeException | Error ex) {
            //@3.1-10:出现(RuntimeException|Error)恢复被挂起的事务
            resume(null, suspendedResources);
            throw ex;
        }
    }
}

然后会执行下面代码

//@3.1-5:是否开启新的事务同步,事务同步是干嘛的,是spring在事务过程中给开发者预留的一些扩展点,稍后细说;大家先这么理解,每个新的事务newSynchronization都是true,开一个一个新的事务就会启动一个新的同步
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//@3.1-6:创建事务状态对象DefaultTransactionStatus,DefaultTransactionStatus是TransactionStatus的默认实现
DefaultTransactionStatus status = newTransactionStatus(def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//@3.1-7:doBegin用于开始事务
doBegin(transaction, def);
//@3.1-8:准备事务同步
prepareSynchronization(status, def);
//@3.1-9:返回事务状态对象
return status;

上面过程我们重点来说一下@3.1-7 和 @3.1-8

3.3、@3.1-7:doBegin 开启事务

org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
protected void doBegin(Object transaction, TransactionDefinition definition) {
    //数据源事务对象
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    //数据库连接
    Connection con = null;
    try {
        //txObject.hasConnectionHolder()用来判断txObject.connectionHolder!=null,现在肯定是null,所以txObject.hasConnectionHolder()返回false
        if (!txObject.hasConnectionHolder() ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            //调用transactionManager.datasource.getConnection()获取一个数据库连接
            Connection newCon = obtainDataSource().getConnection();
            //将数据库连接丢到一个ConnectionHolder中,放到txObject中,注意第2个参数是true,表示第一个参数的ConnectionHolder是新创建的
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }
        //连接中启动事务同步
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        //获取连接
        con = txObject.getConnectionHolder().getConnection();
        //获取隔离级别
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        //设置隔离级别
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        //设置是否是只读
        txObject.setReadOnly(definition.isReadOnly());
        //判断连接是否是自动提交的,如果是自动提交的将其置为手动提交
        if (con.getAutoCommit()) {
            //在txObject中存储一下连接自动提交老的值,用于在事务执行完毕之后,还原一下Connection的autoCommit的值
            txObject.setMustRestoreAutoCommit(true);
            //设置手动提交
            con.setAutoCommit(false);
        }
        //准备事务连接
        prepareTransactionalConnection(con, definition);
        //设置事务活动开启
        txObject.getConnectionHolder().setTransactionActive(true);
        //根据事务定义信息获取事务超时时间
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            //设置连接的超时时间
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }
        //txObject中的ConnectionHolder是否是一个新的,确实是新的,所以这个地方返回true
        if (txObject.isNewConnectionHolder()) {
            //将datasource->ConnectionHolder丢到resource ThreadLocal的map中
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    }
}

重点来看一下下面这段代码

TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
源码

org.springframework.transaction.support.TransactionSynchronizationManager#bindResource
public static void bindResource(Object key, Object value) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Map<Object, Object> map = resources.get();
    if (map == null) {
        map = new HashMap<>();
        resources.set(map);
    }
    map.put(actualKey, value);
}

上面这段代码执行完毕之后,datasource->ConnectionHoloder(conn)被放到resources Threadloca的map中了。

3.4、@3.1-8:prepareSynchronization 准备事务同步

//@3.1-8:准备事务同步
prepareSynchronization(status, def);

源码如下,大家看一下,这个方法主要的作用是,开启一个新事务的时候,会将事务的状态、隔离级别、是否是只读事务、事务名称丢到TransactionSynchronizationManager中的各种对应的ThreadLocal中,方便在当前线程中共享这些数据。

org.springframework.transaction.support.AbstractPlatformTransactionManager#prepareSynchronization
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    //如果是一个新的事务,status.isNewSynchronization()将返回true
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                        definition.getIsolationLevel() : null);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
        TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
        //@3.4-1:初始化事务同步
        TransactionSynchronizationManager.initSynchronization();
    }
}

@3.4-1:初始化事务同步

org.springframework.transaction.support.TransactionSynchronizationManager
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<>("Transaction synchronizations");
//获取同步是否启动,新事务第一次进来synchronizations.get()是null,所以这个方法返回的是false
public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
}
//初始化事务同步,主要就是在synchronizations ThreadLocal中放一个LinkedHashSet
public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
        throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    synchronizations.set(new LinkedHashSet<>());
}

3.5、小结

获取事务的过程已经结束了,我们来看一下这个过程中做的一些关键的事情

1、获取db连接:从事务管理器的datasource中调用getConnection获取一个新的数据库连接,将连接置为手动提交
2、将datasource关联连接丢到ThreadLocal中:将第一步中获取到的连丢到ConnectionHolder中,然后将事务管理器的datasource->ConnectionHolder丢到了resource ThreadLocal中,这样我们可以通过datasource在ThreadLocal中获取到关联的数据库连接
3、准备事务同步:将事务的一些信息放到ThreadLocal中

4、事务方法中执行增删改查

以下面这个插入操作来看一下这个插入是如何参与到spring事务中的。

jdbcTemplate.update("insert into t_user (name) values (?)", "test1-1");

最终会进入到jdbctemplate#execute方法里,无用代码我们给剔除,重点内部关注下面获取连接的方法

org.springframework.jdbc.core.JdbcTemplate#execute(org.springframework.jdbc.core.PreparedStatementCreator, org.springframework.jdbc.core.PreparedStatementCallback<T>){
    //获取数据库连接
    Connection con = DataSourceUtils.getConnection(obtainDataSource());
    //通过conn执行db操作
}

obtainDataSource()会返回jdbctemplate.datasource对象,下面重点来看DataSourceUtils.getConnection源码,最终会进入下面这个方法

org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    //用jdbctemplate.datSource从TransactionSynchronizationManager的resouce ThreadLocal中获取对应的ConnectionHolder对象,在前面获取事务环节中,transactionManager.datasource->ConnectionHolder被丢到resouce ThreadLocal,而jdbctemplate.datSource和transactionManager.datasource是同一个对象,所以是可以获取到ConnectionHolder的,此时就会使用事务开启是的数据库连接
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    //conHolder不为空 && conHolder中有数据库连接对象
    if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
        //返回conHolder中的数据库连接对象
        return conHolder.getConnection();
    }
    //如果上面获取不到连接,会走这里,这里将会调用jdbctemplate.datasource.getConnection()从数据源中获取一个新的db连接
    Connection con = fetchConnection(dataSource);
    //将连接返回
    return con;
}

可以得出一个结论:如果要让最终执行的sql受spring事务控制,那么事务管理器中datasource对象必须和jdbctemplate.datasource是同一个,这个结论在其他文章中说过很多次了,这里大家算是搞明白了吧。

5、提交事务

调用事务管理器的commit方法,提交事务

platformTransactionManager.commit(transactionStatus);

commit源码

org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
public final void commit(TransactionStatus status) throws TransactionException {
    //事务是否已经完成,此时还未完成,如果事务完成了,再来调用commit方法会报错
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }
    //事务状态
    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    //defStatus.rollbackOnly是否是true,如果是true,说明事务状态被标注了需要回滚,此时走回滚逻辑
    if (defStatus.isLocalRollbackOnly()) {
        //走回滚逻辑
        processRollback(defStatus, false);
        return;
    }
    //提交事务过程
    processCommit(defStatus);
}

processCommit源码

org.springframework.transaction.support.AbstractPlatformTransactionManager#processCommit
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        try {
            //提交之前的回调(给开发提供的扩展点)
            triggerBeforeCommit(status);
            //事务完成之前的回调(给开发提供的扩展点)
            triggerBeforeCompletion(status);
            //是否是新事务,如果是新事务,将执行提交操作,比如传播行为是REQUIRED中嵌套了一个REQUIRED,那么内部的事务就不是新的事务,外部的事务是新事务
            if (status.isNewTransaction()) {
                //@5-1:执行提交操作
                doCommit(status);
            }
        } catch (UnexpectedRollbackException ex) {
            //事务完成之后执行的回调(给开发提供的扩展点)
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        } catch (RuntimeException | Error ex) {
            //提交过程中有异常,执行回滚操作
            doRollbackOnCommitException(status, ex);
            throw ex;
        }
        try {
            //事务commit之后,执行一些回调(给开发提供的扩展点)
            triggerAfterCommit(status);
        } finally {
            //事务完成之后,执行一些回调(给开发提供的扩展点)
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }
    } finally {
        //事务执行完毕之后,执行一些清理操作
        cleanupAfterCompletion(status);
    }
}

上面这个方法看起来挺长的,重点会做3件事情:

1、给开发提供的扩展点:以trigger开头的方法,是留给开发的扩展点,可以在事务执行的过程中执行一些回调,主要是在事务提交之前,提交之后,回滚之前,回滚之后,可以执行一些回调,也就是事务同步要干的事情,这个扩展点稍后说。

2、通过connection执行commit操作,对应上面的 @5-1 代码:doCommit(status);

3、完成之后执行清理操作:finally中执行 cleanupAfterCompletion(status);

来看看doCommit(status)方法,内部主要就是调用connection的commit()提交事务,如下:

org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    //从ConnectionHolder中获取Connection
    Connection con = txObject.getConnectionHolder().getConnection();
    //执行commit,提交数据库事务
    con.commit();
}

cleanupAfterCompletion(status):清理操作

org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    //将事务状态置为已完成
    status.setCompleted();
    //是否是新的事务同步
    if (status.isNewSynchronization()) {
        //将TransactionSynchronizationManager中的那些ThreadLoca中的数据都清除,会调用ThreadLocal的remove()方法清除数据
        TransactionSynchronizationManager.clear();
    }
    //是否是新事务
    if (status.isNewTransaction()) {
        //执行清理操作
        doCleanupAfterCompletion(status.getTransaction());
    }
    //是否有被挂起的事务
    if (status.getSuspendedResources() != null) {
        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
        //恢复被挂起的事务
        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}

doCleanupAfterCompletion源码

org.springframework.jdbc.datasource.DataSourceTransactionManager#doCleanupAfterCompletion
protected void doCleanupAfterCompletion(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    //是否是一个新的ConnectionHolder,如果是新的事务,那么ConnectionHolder是新的
    if (txObject.isNewConnectionHolder()) {
        //将transactionManager.datasource->ConnectionHolder从resource Threadlocal中干掉
        TransactionSynchronizationManager.unbindResource(obtainDataSource());
    }
    //下面重置Connection,将Connection恢复到最原始的状态
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        if (txObject.isMustRestoreAutoCommit()) {
            //自动提交
            con.setAutoCommit(true);
        }
        //恢复connction的隔离级别、是否是只读事务
        DataSourceUtils.resetConnectionAfterTransaction(
                con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
    } catch (Throwable ex) {
        logger.debug("Could not reset JDBC Connection after transaction", ex);
    }
    //是否是新的连接
    if (txObject.isNewConnectionHolder()) {
        //释放连接,内部会调用conn.close()方法
        DataSourceUtils.releaseConnection(con, this.dataSource);
    }
    //还原ConnectionHoloder到最初的状态
    txObject.getConnectionHolder().clear();
}

终结一下,清理工作主要做的事情就是释放当前线程占有的一切资源,然后将被挂起的事务恢复。

6、回滚事务

回滚的操作和提交的操作差不多的,源码我就不讲了,大家自己去看一下。

7、存在事务的情况如何走

下面来看另外一个流程,REQUIRED中嵌套一个REQUIRED_NEW,然后走到REQUIRED_NEW的时候,代码是如何运行的?大致的过程如下

1、判断上线文中是否有事务

2、挂起当前事务

3、开启新事务,并执行新事务

4、恢复被挂起的事务

7.1、判断是否有事务:isExistingTransaction
判断上线文中是否有事务,比较简单,如下:

org.springframework.jdbc.datasource.DataSourceTransactionManager#isExistingTransaction
protected boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    //txObject.connectionHolder!=null && connectionHolder事务处于开启状态(上面我们介绍过在doBegin开启事务的时候connectionHolder.transactionActive会被置为true)
    return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

7.2、若当前存在事务
我们再来看一下获取事务中,有事务如何走

org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    //获取事务
    Object transaction = doGetTransaction();
    //是否存在事务
    if (isExistingTransaction(transaction)) {
        //存在事务会走这里
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
}

当前存在事务,然后会进入handleExistingTransaction方法

org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
    //当前有事务,被嵌套的事务传播行为是PROPAGATION_NEVER,抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        //当前有事务,被嵌套的事务传播行为是PROPAGATION_NOT_SUPPORTED,那么将先调用suspend将当前事务挂起,然后以无事务的方式运行被嵌套的事务
        //挂起当前事务
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        //以无事务的方式运行
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        //被嵌套的事务传播行为是PROPAGATION_REQUIRES_NEW,那么会先挂起当前事务,然后会重新开启一个新的事务
        //挂起当前事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            //下面的过程我们就不在再介绍了,之前有介绍过
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        } catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    //其他的传播行为走下面。。。,暂时省略了
}

下面重点看事务挂起和事务的恢复操作。

7.3、事务挂起:suspend
事务挂起调用事务管理器的suspend方法,源码如下,主要做的事情:将当前事务中的一切信息保存到SuspendedResourcesHolder对象中,相当于事务的快照,后面恢复的时候用;然后将事务现场清理干净,主要是将一堆存储在ThreadLocal中的事务数据干掉。

org.springframework.transaction.support.AbstractPlatformTransactionManager#suspend
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    //当前事务同步是否被激活,如果是新事务,这个返回的是true
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        //挂起事务同步,这个地方会可以通过TransactionSynchronization接口给开发者提供了扩展点,稍后我们会单独介绍TransactionSynchronization接口,这个接口专门用来在事务执行过程中做回调的
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                //@1:获取挂起的资源
                suspendedResources = doSuspend(transaction);
            }
            //下面就是获取当前事务的各种信息(name,readyOnly,事务隔离级别,是否被激活)
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
    }
}

下面来看看@1:doSuspend(transaction)源码,主要就是将datasource->connectionHolder从resource ThreadLocal中解绑,然后将connectionHolder返回,下面这个方法实际上返回的就是connectionHolder对象

org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    //将connectionHolder置为null
    txObject.setConnectionHolder(null);
    //将datasource->connectionHolder从resource ThreadLocal中解绑,并返回被解绑的connectionHolder对象
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

此时,当前的事务被挂起了,然后开启一个新的事务,新的事务的过程上面已经介绍过了,下面我们来看事务的恢复过程。

7.4、事务恢复:resume
事务挂起调用事务管理器的resume方法,源码如下,主要做的事情:通过SuspendedResourcesHolder对象中,将被挂起的事务恢复,SuspendedResourcesHolder对象中保存了被挂起的事务所有信息,所以可以通过这个对象来恢复事务。

org.springframework.transaction.support.AbstractPlatformTransactionManager#resume
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
        throws TransactionException {
    if (resourcesHolder != null) {
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            //恢复被挂起的资源,也就是将datasource->connectionHolder绑定到resource ThreadLocal中
            doResume(transaction, suspendedResources);
        }
        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        //下面就是将数据恢复到各种ThreadLocal中
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            //恢复事务同步(将事务扩展点恢复)
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

8、事务执行过程中的回调接口: TransactionSynchronization

8.1、作用
spring事务运行的过程中,给开发者预留了一些扩展点,在事务执行的不同阶段,将回调扩展点中的一些方法。

比如我们想在事务提交之前、提交之后、回滚之前、回滚之后做一些事务,那么可以通过扩展点来实现。

8.2、扩展点的用法
1、定义事务TransactionSynchronization对象
TransactionSynchronization接口中的方法在spring事务执行的过程中会自动被回调

public interface TransactionSynchronization extends Flushable {
    //提交状态
    int STATUS_COMMITTED = 0;
    //回滚状态
    int STATUS_ROLLED_BACK = 1;
    //状态未知,比如事务提交或者回滚的过程中发生了异常,那么事务的状态是未知的
    int STATUS_UNKNOWN = 2;
    //事务被挂起的时候会调用被挂起事务中所有TransactionSynchronization的resume方法
    default void suspend() {
    }
    //事务恢复的过程中会调用被恢复的事务中所有TransactionSynchronization的resume方法
    default void resume() {
    }
    //清理操作
    @Override
    default void flush() {
    }
    //事务提交之前调用
    default void beforeCommit(boolean readOnly) {
    }
    //事务提交或者回滚之前调用
    default void beforeCompletion() {
    }
    //事务commit之后调用
    default void afterCommit() {
    }
    //事务完成之后调用
    default void afterCompletion(int status) {
    }
}

2、将TransactionSynchronization注册到当前事务中
通过下面静态方法将事务扩展点TransactionSynchronization注册到当前事务中

TransactionSynchronizationManager.registerSynchronization(transactionSynchronization)
看一下源码,很简单,丢到ThreadLocal中了

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<>("Transaction synchronizations");
public static void registerSynchronization(TransactionSynchronization synchronization)
            throws IllegalStateException {
    Set<TransactionSynchronization> synchs = synchronizations.get();
    if (synchs == null) {
        throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchs.add(synchronization);
}

当有多个TransactionSynchronization的时候,可以指定其顺序,可以实现org.springframework.core.Ordered接口,来指定顺序,从小大的排序被调用,TransactionSynchronization有个默认适配器TransactionSynchronizationAdapter,这个类实现了Ordered接口,所以,如果我们要使用的时候,直接使用TransactionSynchronizationAdapter这个类。

3、回调扩展点TransactionSynchronization中的方法
TransactionSynchronization中的方法是spring事务管理器自动调用的,本文上面有提交到,事务管理器在事务提交或者事务回滚的过程中,有很多地方会调用trigger开头的方法,这个trigger方法内部就会遍历当前事务中的transactionSynchronization列表,然后调用transactionSynchronization内部的一些指定的方法。

以事务提交的源码为例,来看一下

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    triggerBeforeCommit(status);
    triggerBeforeCompletion(status);
    //....其他代码省略
}

triggerBeforeCommit(status)源码

protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
    }
}

TransactionSynchronizationUtils.triggerBeforeCommit 源码

public static void triggerBeforeCommit(boolean readOnly) {
    for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
        synchronization.beforeCommit(readOnly);
    }
}
Last modification:November 29th, 2023 at 06:42 pm
If you think my article is useful to you, please feel free to appreciate

Leave a Comment