Seata AT模式TM处理流程图文示例详解

目录
  • TM的作用
  • 源码分解
  • 小结

TM的作用

我们根据源码解读画出了下图,该图示展现了TM在整个Seata AT模式的分布式事务中所起的作用:

从上图中可以看出,TM主要有两个作用:

开启分布式事务,以拿到XID作为分布式事务开启的标识;一定是从TC拿到XID,不是从调用方传递过来的XID;

根据所有RM的处理结果来决定是提交分布式事务还是回滚分布式事务;

转换成伪代码如下:

try{
  // 开启分布式事务
  String xid = TM.beginGlobalTransaction();
  // 执行业务逻辑,包含远程rpc调用
  RM1.execute(xid); -------RPC调用--------> RM2.execute(xid);
  // 提交分布式事务
  TM.commitGlobalTransaction(xid);
}catch(Exception e){
  // 回滚分布式事务
  TM.rollbackGlobalTransaction(xid);
}finally{
  // 恢复现场
}

源码分解

在之前讲述图解Seata AT模式启动流程中,我们已经知道了TM的处理流程是通过扫描注解@GlobalTransactional来完成对业务逻辑的拦截的。

主要完成这个拦截功能的类是io.seata.spring.annotation.GlobalTransactionalInterceptor,在这个类中,我们主要看invoke方法:

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        // 拿到被拦截的目标类
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        // 获取目标方法
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        // 判断这个方法是不是Object类中的toString()、equals()等方法
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            // 通过被拦截的方法找出对应的注解GlobalTransactional和GlobalLock
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            // 判断是否开启分布式事务,或者TM是否被降级处理,默认是没有被降级的
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            // 分布式事务可以正常使用
            if (!localDisable) {
                // 如果注解GlobalTransactional存在,那么直接把里面的配置解析成AspectTransactional
                if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) {
                        transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                    } else {
                        transactional = this.aspectTransactional;
                    }
                    // 调用handleGlobalTransaction处理
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
                    // 调用handleGlobalLock处理
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        // 如果是Object类中的方法的话,直接调用,不作拦截
        return methodInvocation.proceed();
    }

以上代码就做了下面几件事情:

判断拦截的方法是否是一个合理的方法,像Object类中的toString()、equals()等方法是不应该被拦截的;

拦截的方法合理的话,那么要确认是否允许开启分布式事务;

  • 如果配置了service.disableGlobalTransaction=true,那么说明不能开启分布式事务;
  • 另一个就是配置了允许TM降级client.tm.degradeCheck=true(默认是false),那么就会开启定时任务不断地与TC通信,如果建立通信失败的次数超过了阈值client.tm.degradeCheckAllowTimes,那么就会触发TM降级,此时无法开启新的分布式事务,降级前开启的分布式事务没有影响;

可以正常地准备分布式事务了,那么开始收集注解的相关信息;

  • 如果是GlobalTransactional注解,交给handleGlobalTransaction()处理;
  • 如果是GlobalLock注解,交给handleGlobalLock()处理;

需要注意的是,我们从源码当中了解到,原来TM还可以做一个降级的配置。降级后的TM是不会开启新的分布式事务的,这个时候只能保证本地事务的正常进行,只有当TM与TC通信恢复后,降级后的TM会立马恢复,可以重新开启新的分布式事务。

在TM降级期间的需要业务侧自行处理因降级导致的数据脏写和脏读问题。

handleGlobalTransaction

处理被@GlobalTransactional标注的业务逻辑

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final AspectTransactional aspectTransactional) throws Throwable {
        // 默认succeed=true
        boolean succeed = true;
        try {
            // 执行分布式事务处理逻辑
            // 详细内容后面介绍
            return transactionalTemplate.execute(new TransactionalExecutor() {
                // 执行业务逻辑
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
                // 分布式事务名称,没有指定的话,就用【方法名+参数类型】命名
                public String name() {
                    String name = aspectTransactional.getName();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
                // 分布式事务信息,其实就是@GlobalTransactional注解里面拿到的配置
                @Override
                public TransactionInfo getTransactionInfo() {
                    // reset the value of timeout
                    int timeout = aspectTransactional.getTimeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(aspectTransactional.getPropagation());
                    transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
                    transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getRollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            // 发生异常
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                // 已经回滚过了
                case RollbackDone:
                    throw e.getOriginalException();
                // 开启分布式事务失败
                case BeginFailure:
                    // 分布式事务失败
                    succeed = false;
                    // 调用失败处理逻辑
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                // 分布式事务提交失败
                case CommitFailure:
                    // 分布式事务失败
                    succeed = false;
                    // 调用失败处理逻辑
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                // 回滚失败
                case RollbackFailure:
                    // 调用失败处理逻辑
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                // 回滚重试
                case RollbackRetrying:
                    // 调用失败处理器中的回滚重试回调逻辑
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                // 啥也不是,直接抛异常
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {
            // 如果允许TM降级,那么这次处理完毕后,说明与TC恢复通信,可以解除降级
            if (degradeCheck) {
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }

其实上面就一行代码,使用的是模版模式,所以其实真正的重点还是应该进到模版里面去看看具体是怎么处理的。

public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. 拿到整理好的@GlobalTransactional注解里面的配置信息
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 获取当前的分布式事务,如果为null的话,说明这是分布式事务的发起者;如果不为null,说明这是分布式事务的参与者
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();
        // 1.2 获取分布式事务的传播级别,其实就是按照spring的传播级别来一套,区别就是spring事务是本地事务,这是分布式事务,原理都一样
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            // 这个switch里面全都是处理分布式事务传播级别的
            switch (propagation) {
                // 如果不支持分布式事务,如果当前存在事务,那么先挂起当前的分布式事务,再执行业务逻辑
                case NOT_SUPPORTED:
                    // 分布式事务存在,先挂起
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // 执行业务逻辑
                    return business.execute();
                // 如果是每次都要创建一个新的分布式事务,先把当前存在的分布式事务挂起,然后创建一个新分布式事务
                case REQUIRES_NEW:
                    // 如果分布式事务存在,先挂起当前分布式事务,再创建一个新的分布式事务
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // 之所以用break,是为了后面的代码和其他的传播级别一起共用,业务逻辑肯定还是要执行的
                    break;
                // 如果支持分布式事务,如果当前不存在分布式事务,那么直接执行业务逻辑,否则以分布式事务的方式执行业务逻辑
                case SUPPORTS:
                    // 如果不存在分布式事务,直接执行业务逻辑
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // 否则,以分布式事务的方式执行业务逻辑
                    break;
                // 如果有分布式事务,就在当前分布式事务下执行业务逻辑,否则创建一个新的分布式事务执行业务逻辑
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                // 如果不允许有分布式事务,那么一旦发现存在分布式事务,直接抛异常;只有不存在分布式事务的时候才正常执行
                case NEVER:
                    // 存在分布式事务,抛异常
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // 不存在分布式事务,执行业务逻辑
                        return business.execute();
                    }
                // 一定要有分布式事务,分布式事务不存在的话,抛异常;
                case MANDATORY:
                    // 不存在分布式事务,抛异常
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }
            // 上面的传播级别的逻辑处理完毕,下面就是公共的处理逻辑
            // 1.3 如果当前分布式事务没有的话,那么我们就要创建新的分布式事务,此时我们就是分布式事务的发起者,也就是TM本身,否则不能称之为`TM`
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }
            // 开始准备干活的条件
            // 把我们这个方法的全局锁配置放进当前线程中,并且把线程中已有的全局锁的配置取出来
            // 我们在干完自己的活后,会把这个取出来的配置放回去的
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
            try {
                // 2. 如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干
                // 这个XID可以从RootContext中获取
                beginTransaction(txInfo, tx);
                Object rs;
                try {
                    // 执行业务逻辑
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. 发生任何异常,我们准备启动回滚机制
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
                // 4. 一切顺利,通知提交分布式事务
                commitTransaction(tx);
                return rs;
            } finally {
                //5. 恢复现场,把之前的配置放回去
                resumeGlobalLockConfig(previousConfig);
                // 触发回调
                triggerAfterCompletion();
                // 清理工作
                cleanUp();
            }
        } finally {
            // 恢复之前挂起的事务
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

根据上面的源码分析,execute方法做了以下几件事情:

处理分布式事务的传播级别,参照spring的事务传播级别;

如果是分布式事务的发起者,那么需要与TC通信,并获取XID开启分布式事务;

如果业务逻辑处理出现异常,说明分布式事务需要准备回滚;如果没有任何异常,那么准备发起分布式事务提交

分布式事务处理完毕后,准备恢复现场

分布式事务开启:

 private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 回调,默认是空回调
            triggerBeforeBegin();
            // 发起分布式事务
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            // 回调,默认是空回调
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);
        }
    }
@Override
    public void begin(int timeout, String name) throws TransactionException {
        // 如果不是分布式事务发起者,那么啥也不做
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        // 如果当前已经处于分布式事务当中,那么抛异常,因为事务发起者不可能事先处于别的分布式事务当中
        String currentXid = RootContext.getXID();
        if (currentXid != null) {
            throw new IllegalStateException("Global transaction already exists," +
                " can't begin a new global transaction, currentXid = " + currentXid);
        }
        // 发起分布式事务
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        // 把xid绑定到当前线程中
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }
@Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        // 发起分布式事务开启的请求
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        // 获取拿到的xid,表示分布式事务开启成功
        return response.getXid();
    }

1.分布式事务的发起其实就是TM向TC请求,获取XID,并把XID绑定到当前线程中

异常回滚:

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        // 如果异常类型和指定的类型一致,那么发起回滚;不一致还是要提交分布式事务
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
                // 回滚分布式事务
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // 回滚失败抛异常
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
            // 不是指定的异常类型,还是继续提交分布式事务
            commitTransaction(tx);
        }
    }
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        // 执行回调,默认空回调
        triggerBeforeRollback();
        // 回滚
        tx.rollback();
        // 执行回调,默认空回调
        triggerAfterRollback();
        // 就算回滚没问题,照样抛异常,目的应该是告知开发人员此处产生了回滚
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }
@Override
    public void rollback() throws TransactionException {
        // 如果是分布式事务参与者,那么啥也不做,RM的回滚不在这里,这是TM的回滚
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of rollback
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        // 下面就是一个循环重试发起分布式事务回滚
        int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    // 发起回滚的核心代码
                    status = transactionManager.rollback(xid);
                    // 回滚成功跳出循环
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    // 重试失败次数完成才会跳出循环
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex);
                    }
                }
            }
        } finally {
            // 如果回滚的分布式事务就是当前的分布式事务,那么从当前线程中解绑XID
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status);
        }
    }
@Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        // 准备发起请求给TC,回滚指定的分布式事务
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

分布式事务回滚逻辑中有以下几个点:

触发回滚需要产生的异常和注解中指定的异常一致才会发起回滚,否则还是继续提交;

回滚是可以设置重试次数的,只有重试都失败了,才会导致回滚失败,否则只要有一次成功,那么回滚就成功;

TM发起的回滚其实只是和TC发起一次分布式事务回滚的通信,并没有数据库的操作;

分布式事务提交:

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 回调,默认空回调
            triggerBeforeCommit();
            // 分布式事务提交
            tx.commit();
            // 回调,默认空回调
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 提交出异常,提交失败
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
    }
@Override
    public void commit() throws TransactionException {
        // 如果只是分布式事务参与者,那么啥也不干,TM只能有一个,哈哈
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        // 分布式事务提交也是有重试的
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    // 发起分布式事务提交
                    status = transactionManager.commit(xid);
                    // 提交成功跳出循环
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    // 重试结束,依然失败就抛异常
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
            // 如果提交的分布式事务就是当前事务,那么需要清理当前线程中的XID
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status);
        }
    }
@Override
    public GlobalStatus commit(String xid) throws TransactionException {
        // 发起分布式事务提交请求,这是与TC通信
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

分布式事务回滚也是可以设置重试次数的;

分布式事务提交其实也是TM与TC进行通信,告知TC这个XID对应的分布式事务可以提交了;

handleGlobalLock

private Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable {
        // 模版模式实现全局锁
        return globalLockTemplate.execute(new GlobalLockExecutor() {
            // 执行业务逻辑
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }
            // 获取全局锁配置
            // 一个是全局锁重试间隔时间
            // 一个是全局锁重试次数
            @Override
            public GlobalLockConfig getGlobalLockConfig() {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                return config;
            }
        });
    }
public Object execute(GlobalLockExecutor executor) throws Throwable {
        // 判断当前是否有全局锁
        boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
        // 如果没有全局锁,那么在当前线程中设置需要全局锁标识
        if (!alreadyInGlobalLock) {
            RootContext.bindGlobalLockFlag();
        }
        // 把全局锁的配置设置进当前线程,并把线程中已有的全局锁配置拿出来,后面恢复现场需要用
        GlobalLockConfig myConfig = executor.getGlobalLockConfig();
        GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
        try {
            // 执行业务逻辑
            return executor.execute();
        } finally {
            // 清除线程中的全局锁标记
            if (!alreadyInGlobalLock) {
                RootContext.unbindGlobalLockFlag();
            }
            // 恢复现场
            if (previousConfig != null) {
                GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
            } else {
                GlobalLockConfigHolder.remove();
            }
        }
    }

其实真正的全局锁逻辑并不在TM当中,TM只是负责根据@GlobalLock注解把相应的全局锁标记绑定到线程中,真正负责处理全局锁的还是底层的RM;

小结

至此我们已经把TM的所有工作都解读完毕了,下面来做一个小结:

1.TM主要针对两个注解GlobalTransactional和GlobalLock来实现处理逻辑,原理都是基于Aop和反射;处理逻辑里面涉及到TM降级的一个情况,这是一个值得注意的点

2.处理GlobalTransactional主要分两步:

  • 开启分布式事务,需要与TC交互,存在rpc开销;
  • 根据RM的处理情况决定是提交分布式事务还是回滚分布式事务,也是需要与TC交互,存在rpc开销;在提交或回滚分布式事务中,还可以设置重试次数;

3.处理GlobalLock,主要就是在当前线程中设置一个需要检查全局锁的标记,让底层的RM去做全局锁的检测动作;

以上就是Seata AT模式TM处理流程图文示例详解的详细内容,更多关于Seata AT模式TM处理流程的资料请关注我们其它相关文章!

(0)

相关推荐

  • Spring Cloud + Nacos + Seata整合过程(分布式事务解决方案)

    目录 一.简介 二.seata-server部署 1.官网下载 2.解压到本地 3.修改配置文件 4.seata数据库初始化 5.业务数据库 6.启动seata-server 三.微服务项目集成Seata 1.引入依赖 2.配置文件 一.简介    Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务.   2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback),和社区

  • springcloud整合seata的实现代码

    目录 一.背景 二.项目结构 三.实现功能: 四.项目使用到的技术 五.整合步骤 1.引入spring-cloud-starter-alibaba-seata jar包 2.涉及到的业务库操作 1.业务库需要存在 undo_log 表 2.业务表主键 3.页面中自动更新时间戳 3.开启数据源代理 1.自动配置数据源代理 2.手动配置AT模式数据源代理 4.传递 xid 5.事务分组和seata server对应上 6.注册中心和配置中心 7.业务方法加上@GlobalTransactional

  • springboot整合shardingsphere和seata实现分布式事务的实践

    各个框架版本信息 springboot: 2.1.3 springcloud: Greenwich.RELEASE seata: 1.0.0 shardingsphere:4.0.1 maven 依赖        <dependency>         <!--<groupId>io.shardingsphere</groupId>-->         <groupId>org.apache.shardingsphere</group

  • seata springcloud整合教程与遇到的坑

    SEATA概要 seata 是alibaba 出的一款分布式事务管理器,他有侵入性小,实现简单等特点.我们能够使用seata 实现分布式事务管理, 是微服务必备的组件.他可以实现在微服务之间的事务管理,也可以实现多个数据源的事务管理. seata 在阿里内部,和众多的公司都有应用,因此我们可以放心的使用它. 依赖 <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-clou

  • springboot 整合 seata的配置过程

    前言: 小编引入的图片和文字描述都是来自于尚硅谷的视频讲解,在此感谢尚硅谷的老师,同时也结合 seata文档官方文档进行整合 项目地址(gitee): https://gitee.com/qinenqi/online springboot整合 seata 整合配置 online-project 这个服务调用 online-coupon这个服务 在 这两个被整合的服务对用的数据库中分别 创建 UNDO_LOG 表 -- 注意此处0.3.0+ 增加唯一索引 ux_undo_log CREATE TA

  • Java分布式事务管理框架之Seata

    目录 Seata介绍 三大组件 实现原理 四种事务模式 搭建seata服务端 单机版安装 集群安装 Seata介绍 Seata:Simple Extensible Autonomous Transaction Architecture,简易可扩展的自治式分布式事务管理框架,其前身是fescar.是一种简单分布式事务的解决方案.Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务.Seata 将为用户提供了 AT.TCC.SAGA 和 XA 事务模式,为用户打造一

  • Seata AT模式TM处理流程图文示例详解

    目录 TM的作用 源码分解 小结 TM的作用 我们根据源码解读画出了下图,该图示展现了TM在整个Seata AT模式的分布式事务中所起的作用: 从上图中可以看出,TM主要有两个作用: 开启分布式事务,以拿到XID作为分布式事务开启的标识:一定是从TC拿到XID,不是从调用方传递过来的XID: 根据所有RM的处理结果来决定是提交分布式事务还是回滚分布式事务: 转换成伪代码如下: try{ // 开启分布式事务 String xid = TM.beginGlobalTransaction(); //

  • Seata AT模式启动过程图文示例详解

    目录 背景 SeataDataSourceAutoConfiguration SeatAutoConfiguration HttpAutoConfiguration 小结 背景 为了了解Seata AT模式的原理,我通过源码解读的方式画出了Seata AT模式启动的图示: 如果是基于Springboot项目的话,项目启动的使用,一般约定会先查看spring.factories文件,配置了哪些类是需要自动装配的.Seata也是利用了这个约定,在项目启动的时候,默认会装配指定的类,以完成Seata相

  • java设计模式策略模式图文示例详解

    目录 策略模式 意图 问题 解决方案 真实世界类比 策略模式结构 伪代码 策略模式适合应用场景 实现方式 策略模式优缺点 策略模式优缺点 与其他模式的关系 策略模式 亦称:Strategy 意图 策略模式是一种行为设计模式,它能让你定义一系列算法,并将每种算法分别放入独立的类中,以使算法的对象能够相互替换. 问题 一天,你打算为游客们创建一款导游程序.该程序的核心功能是提供美观的地图,以帮助用户在任何城市中快速定位. 用户期待的程序新功能是自动路线规划:他们希望输入地址后就能在地图上看到前往目的

  • React 模式之纯组件使用示例详解

    目录 什么是纯组件 纯组件解决了什么问题 怎么使用纯组件 CC: shouldComponentUpdate() 和 React.PureComponent FC: React.memo() 你可能并不需要纯组件 什么是纯组件 纯组件(Pure Component)这概念衍生自纯函数.纯函数指的是返回结果只依赖于传入的参数,且对函数作用域外没有副作用的函数.这种函数在相同参数下,返回结果是不变的.纯函数的返回值能被安全地缓存起来,在下次调用时,跳过函数执行,直接读取缓存.因为函数没有外部副作用,

  • 通俗易懂的C++前缀和与差分算法图文示例详解

    目录 1.前缀和 2.前缀和算法有什么好处? 3.二维前缀和 4.差分 5.一维差分 6.二维差分 1.前缀和 前缀和是指某序列的前n项和,可以把它理解为数学上的数列的前n项和,而差分可以看成前缀和的逆运算.合理的使用前缀和与差分,可以将某些复杂的问题简单化. 2.前缀和算法有什么好处? 先来了解这样一个问题: 输入一个长度为n的整数序列.接下来再输入m个询问,每个询问输入一对l, r.对于每个询问,输出原序列中从第l个数到第r个数的和. 我们很容易想出暴力解法,遍历区间求和. 代码如下: in

  • 原生js中运算符及流程控制示例详解

    运算符 算数:+ 加.- 减.* 乘./ 除.% 求模 赋值:=.+=.-=.*=./=.%= 关系:>.<. >=. <=. ==. ===. !=. !== 逻辑:||或.&&与.!否 实例1.求模 window.onload = function(){ alert(0%2) //0 alert(1%2) //1 alert(2%2) //0 } 实例2.隔行变色 <body> <ol> <li>取模:就是求余数</li

  • Spring Aop基本流程原理示例详解

    一.代理对象的创建过程: AbstractAutowireCapableBeanFactory#initializeBean protectedObjectinitializeBean(StringbeanName,Objectbean,@NullableRootBeanDefinitionmbd){ if(System.getSecurityManager()!=null){ AccessController.doPrivileged((PrivilegedAction<Object>)()

  • Flutter 异步编程之单线程下异步模型图文示例详解

    目录 一. 本专栏图示概念规范 1. 任务概念规范 2. 任务的状态 3. 时刻与时间线 4.同步与异步 二.理解单线程中的异步任务 1. 任务的分配 2.异步任务特点 3. 异步任务完成与回调 三. Dart 语言中的异步 1.编程语言中与异步模型的对应关系 2.Dart 编程中的异步任务 3.当前任务分析 四.异步模型的延伸 1. 单线程异步模型的局限性 2. 多线程与异步的关系 3. Dart 中如何解决单线程异步模型的局限性 一. 本专栏图示概念规范 本专栏是对 异步编程 的系统探索,会

  • Golang设计模式工厂模式实战写法示例详解

    目录 拆出主板 工厂模式流程 代码实战 抽象能力,定义接口 实现工厂,支持注册和获取实现 主流程只依赖接口完成 扩展 => 适配器,实现接口 注册适配器到工厂里 小结 拆出主板 今天带大家看一下怎么用 Go 写工厂模式的代码,我们来学习一个实战案例.这个写法笔者日常经常使用,能够很有效地帮助大家实现 Separation of Concerns. 主板就是一个程序的主流程.比如我们要基于一份学习资料来消化,吸收知识.我们可能有下面几步流程: 准备好笔记本: 打开资料: 阅读资料内容,思考并记录关

  • Android 应用程序的启动流程示例详解

    目录 应用进程的启动流程 1.ActivityStackSupervisor.startSpecificActivity 2.ATMS.startProcessAsync 3.LocalService.startProcess 4.startProcessLocked函数 5.ProcessList.startProcessLocked 6.ProcessList.startProcessLocked重载 7.ProcessList.startProcess 8.ZygoteState.star

随机推荐