轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • JavaScript
  • TypeScript
  • Node.js
  • Vue.js
  • 前端工程化
  • 浏览器与Web API
  • 架构设计与模式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • JavaScript
  • TypeScript
  • Node.js
  • Vue.js
  • 前端工程化
  • 浏览器与Web API
  • 架构设计与模式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 架构设计与模式

    • 高可用-分布式基础之CAP理论
    • 高可用-服务容错与降级策略
    • 高可用-故障检测与自动恢复
    • 高可用-混沌工程实践
    • 高可用-分布式事务实战
      • 一、分布式事务的挑战
        • 1、为什么需要分布式事务?
        • 2、分布式事务的核心挑战
        • 3、ACID 在分布式环境的困境
      • 二、分布式事务理论基础
        • 1、CAP 理论与分布式事务
        • 2、BASE 理论
        • 3、分布式事务的一致性模型
      • 三、两阶段提交协议 (2PC)
        • 1、2PC 基本原理
        • 阶段一: 准备阶段 (Prepare)
        • 阶段二: 提交阶段 (Commit)
        • 2、2PC 代码示例
        • 3、2PC 的问题
        • 3.1、同步阻塞问题
        • 3.2、单点故障问题
        • 3.3、数据不一致问题
        • 4、2PC 的应用场景
      • 四、三阶段提交协议 (3PC)
        • 1、3PC 基本原理
        • 阶段一: CanCommit
        • 阶段二: PreCommit
        • 阶段三: DoCommit
        • 2、3PC 的改进
        • 3、3PC 的问题
      • 五、TCC 补偿型事务
        • 1、TCC 基本原理
        • 2、TCC 执行流程
        • 3、TCC 代码示例
        • 4、TCC 的三大挑战
        • 4.1、空回滚问题
        • 4.2、幂等性问题
        • 4.3、悬挂问题
        • 5、TCC 框架: Seata
      • 六、Saga 长事务模式
        • 1、Saga 基本原理
        • 2、Saga 的两种协调模式
        • 2.1、编排模式 (Choreography)
        • 2.2、编排模式 (Orchestration)
        • 3、Saga 代码示例
        • 4、Saga 的注意事项
        • 4.1、补偿操作的设计原则
        • 4.2、补偿的局限性
        • 5、Saga 框架: Eventuate Tram Saga
      • 七、最终一致性方案
        • 1、本地消息表模式
        • 2、事务消息模式
        • 3、最大努力通知模式
      • 八、分布式事务实战经验
        • 1、技术选型决策树
        • 2、常见场景的方案选择
        • 3、分布式事务的最佳实践
        • 3.1、能不用就不用
        • 3.2、幂等性设计
        • 3.3、超时和重试策略
        • 3.4、监控和告警
        • 4、常见问题和解决方案
        • 4.1、问题: 分布式事务性能低
        • 4.2、问题: 事务悬挂或长时间未完成
        • 4.3、问题: 消息重复消费
      • 九、总结与展望
        • 1、核心要点回顾
        • 2、设计原则
        • 3、未来发展方向
        • 4、学习建议
    • 高可用-多活与容灾架构设计
    • 高性能-缓存架构设计
    • 高性能-性能优化方法论
    • 高性能-异步处理与消息队列
    • 高性能-数据库性能优化
  • 代码质量管理

  • 基础

  • 操作系统

  • 计算机网络

  • AI

  • 编程范式

  • 安全

  • 中间件

  • 心得

  • 架构
  • 架构设计与模式
轩辕李
2024-12-26
目录

高可用-分布式事务实战

在分布式系统中,分布式事务是最具挑战性的技术难题之一。

当一个业务操作需要跨越多个服务、多个数据库时,如何保证数据的一致性?如何在性能和一致性之间取得平衡?

本文将深入探讨分布式事务的理论基础、主流解决方案以及实战经验。

# 一、分布式事务的挑战

# 1、为什么需要分布式事务?

在单体应用时代,我们依赖数据库的本地事务来保证 ACID 特性:

@Transactional
public void transfer(Long fromId, Long toId, BigDecimal amount) {
    accountDao.debit(fromId, amount);   // 扣款
    accountDao.credit(toId, amount);     // 入账
}

但在微服务架构下,一个业务操作可能涉及多个服务:

订单服务 → 库存服务 → 积分服务 → 支付服务

典型场景:

  • 电商下单: 创建订单 + 扣减库存 + 扣减优惠券 + 扣款
  • 转账业务: A账户扣款 + B账户入账 + 记录流水
  • 支付回调: 更新订单状态 + 增加积分 + 发送通知

# 2、分布式事务的核心挑战

挑战 描述 影响
网络不可靠 网络分区、超时、丢包 无法保证原子性
部分失败 某些服务成功,某些失败 数据不一致
性能开销 协调开销、锁等待 系统吞吐量下降
复杂度高 需要考虑重试、补偿、幂等 开发和维护成本高
运维困难 故障排查、事务状态追踪 问题定位困难

# 3、ACID 在分布式环境的困境

传统数据库事务的 ACID 特性在分布式环境下面临挑战:

  • 原子性(Atomicity): 跨服务的操作如何保证全部成功或全部失败?
  • 一致性(Consistency): 多个数据源如何保持一致?
  • 隔离性(Isolation): 分布式环境下的并发控制如何实现?
  • 持久性(Durability): 已提交的事务如何保证不丢失?

# 二、分布式事务理论基础

# 1、CAP 理论与分布式事务

回顾 CAP 理论(详见高可用-分布式基础之CAP理论):

  • C (Consistency): 强一致性
  • A (Availability): 高可用性
  • P (Partition tolerance): 分区容错性

在分布式事务中,P 是必须保证的,真正的选择是在 C 和 A 之间权衡:

选择 特点 适用场景 代表方案
CP 强一致性,可能阻塞 金融、支付 2PC、3PC
AP 最终一致性,高可用 电商、社交 TCC、Saga

# 2、BASE 理论

BASE 是对 CAP 中 AP 方案的补充:

  • Basically Available: 基本可用
  • Soft state: 软状态
  • Eventually consistent: 最终一致性

核心思想: 通过牺牲强一致性来获得可用性,在一定时间窗口后达到最终一致。

# 3、分布式事务的一致性模型

一致性级别 描述 实现难度 性能 适用场景
强一致性 事务完成后立即一致 高 低 金融交易
最终一致性 一定时间后达到一致 中 高 电商订单
因果一致性 有因果关系的操作保持顺序 中 中 社交评论
弱一致性 不保证何时一致,甚至可能不一致 低 最高 浏览量统计

# 三、两阶段提交协议 (2PC)

# 1、2PC 基本原理

两阶段提交是最经典的强一致性分布式事务协议,包含两个阶段:

# 阶段一: 准备阶段 (Prepare)

协调者 (Coordinator)             参与者 (Participant)
      |                               |
      |-------- Prepare ------------->|
      |                               | 执行事务但不提交
      |                               | 写 Undo/Redo 日志
      |<------- Yes/No ---------------|
  1. 协调者向所有参与者发送 Prepare 请求
  2. 参与者执行事务操作,写入 Undo 和 Redo 日志
  3. 参与者向协调者返回 Yes(准备成功) 或 No(准备失败)

# 阶段二: 提交阶段 (Commit)

情况1: 所有参与者都返回 Yes

协调者                              参与者
      |                               |
      |-------- Commit -------------->|
      |                               | 正式提交事务
      |<------- ACK ------------------|

情况2: 任一参与者返回 No

协调者                              参与者
      |                               |
      |-------- Rollback ------------>|
      |                               | 回滚事务
      |<------- ACK ------------------|

# 2、2PC 代码示例

协调者实现:

public class TwoPhaseCommitCoordinator {
    private List<Participant> participants;
    
    public boolean executeTransaction(Transaction tx) {
        // 阶段一: 准备阶段
        List<Boolean> prepareResults = new ArrayList<>();
        for (Participant p : participants) {
            try {
                boolean prepared = p.prepare(tx);
                prepareResults.add(prepared);
            } catch (Exception e) {
                prepareResults.add(false);
            }
        }
        
        // 判断是否所有参与者都准备成功
        boolean allPrepared = prepareResults.stream()
            .allMatch(result -> result);
        
        // 阶段二: 提交或回滚
        if (allPrepared) {
            // 所有参与者都准备好,执行提交
            for (Participant p : participants) {
                p.commit(tx);
            }
            return true;
        } else {
            // 有参与者准备失败,执行回滚
            for (Participant p : participants) {
                p.rollback(tx);
            }
            return false;
        }
    }
}

参与者实现:

public class DatabaseParticipant implements Participant {
    private Connection connection;
    
    @Override
    public boolean prepare(Transaction tx) {
        try {
            // 开启事务
            connection.setAutoCommit(false);
            
            // 执行 SQL 操作
            executeSql(tx.getSqlStatements());
            
            // 写入 Undo/Redo 日志 (数据库自动完成)
            
            // 返回准备成功
            return true;
        } catch (Exception e) {
            return false;
        }
    }
    
    @Override
    public void commit(Transaction tx) {
        try {
            connection.commit();
        } catch (SQLException e) {
            // 记录日志
            log.error("Commit failed", e);
        }
    }
    
    @Override
    public void rollback(Transaction tx) {
        try {
            connection.rollback();
        } catch (SQLException e) {
            log.error("Rollback failed", e);
        }
    }
}

# 3、2PC 的问题

# 3.1、同步阻塞问题

所有参与者在准备阶段后都处于阻塞状态,等待协调者的指令:

参与者 A: [准备完成] → 等待... → 等待... → 等待...
参与者 B: [准备完成] → 等待... → 等待... → 等待...
参与者 C: [准备失败] → 立即返回

协调者收到 C 的失败响应后才能发出回滚指令
期间 A、B 一直持有资源锁,无法处理其他请求

影响: 系统吞吐量低,并发能力差。

# 3.2、单点故障问题

协调者是单点,如果在阶段二之前崩溃:

协调者发送 Prepare 后崩溃
→ 所有参与者永久等待
→ 资源被锁定,无法释放

解决方案:

  • 协调者持久化事务状态
  • 参与者设置超时机制
  • 引入备份协调者

# 3.3、数据不一致问题

如果协调者在阶段二发送 Commit 时,部分参与者收到指令,部分未收到:

协调者: 发送 Commit → [网络分区] → 崩溃
参与者 A: 收到 Commit → 提交成功 ✓
参与者 B: 未收到消息 → 超时回滚 ✗

结果: A 提交了事务,B 回滚了事务,数据不一致。

# 4、2PC 的应用场景

场景 是否适用 原因
金融转账 ✓ 要求强一致性,可接受性能损失
库存扣减 ✗ 高并发场景,阻塞问题严重
订单创建 ✗ 涉及多个服务,协调复杂
配置同步 ✓ 低频操作,对一致性要求高

# 四、三阶段提交协议 (3PC)

# 1、3PC 基本原理

三阶段提交是 2PC 的改进版,将提交过程分为三个阶段:

# 阶段一: CanCommit

协调者                              参与者
      |-------- CanCommit? --------->|
      |                               | 检查是否能执行事务
      |<------- Yes/No ---------------|

参与者只需检查是否能执行,不实际执行事务,不锁定资源。

# 阶段二: PreCommit

情况1: 所有参与者返回 Yes

协调者                              参与者
      |-------- PreCommit ---------->|
      |                               | 执行事务,锁定资源
      |<------- ACK ------------------|

情况2: 任一参与者返回 No

协调者                              参与者
      |-------- Abort -------------->|
      |                               | 中止操作

# 阶段三: DoCommit

情况1: PreCommit 阶段都成功

协调者                              参与者
      |-------- DoCommit ----------->|
      |                               | 正式提交
      |<------- ACK ------------------|

情况2: 超时或失败

参与者在超时后自动提交 (与 2PC 的关键区别)

# 2、3PC 的改进

改进点 2PC 问题 3PC 解决方案
同步阻塞 准备阶段就锁定资源 CanCommit 阶段不锁资源
单点故障 协调者故障导致永久阻塞 参与者超时后自动提交
数据一致性 网络分区可能导致不一致 降低了不一致概率(但仍存在)

# 3、3PC 的问题

虽然 3PC 改进了 2PC,但仍有问题:

网络分区下的一致性问题:

协调者发送 DoCommit 后网络分区:
- 分区1: 收到 DoCommit → 提交
- 分区2: 超时自动提交
- 看似一致,但如果协调者实际发的是 Abort 呢?

结论: 3PC 降低了阻塞概率,但无法完全解决一致性问题。

# 五、TCC 补偿型事务

# 1、TCC 基本原理

TCC 是一种补偿型事务方案,将业务操作分为三个阶段:

  • T (Try): 尝试执行,预留资源
  • C (Confirm): 确认执行,使用预留资源
  • C (Cancel): 取消执行,释放预留资源

核心思想: 将事务的提交和回滚逻辑下放到业务层。

# 2、TCC 执行流程

阶段一: Try 阶段
订单服务: 创建订单(待支付状态)
库存服务: 冻结库存(库存-1, 冻结+1)
积分服务: 冻结积分
支付服务: 预扣款(余额-100, 冻结+100)

阶段二: Confirm/Cancel
如果所有 Try 成功:
  订单服务: 更新订单为已支付
  库存服务: 扣减冻结库存
  积分服务: 增加积分
  支付服务: 扣减冻结金额

如果任一 Try 失败:
  订单服务: 删除订单
  库存服务: 释放冻结库存
  积分服务: 释放冻结积分
  支付服务: 释放冻结金额

# 3、TCC 代码示例

账户服务 TCC 实现:

@Service
public class AccountTccService {
    
    @Autowired
    private AccountDao accountDao;
    
    /**
     * Try: 冻结金额
     */
    @Transactional
    public boolean tryDeduct(String accountId, BigDecimal amount) {
        Account account = accountDao.findById(accountId);
        
        // 检查余额是否足够
        if (account.getBalance().compareTo(amount) < 0) {
            return false;
        }
        
        // 冻结金额: balance - amount, frozen + amount
        account.setBalance(account.getBalance().subtract(amount));
        account.setFrozen(account.getFrozen().add(amount));
        
        accountDao.update(account);
        return true;
    }
    
    /**
     * Confirm: 扣减冻结金额
     */
    @Transactional
    public void confirmDeduct(String accountId, BigDecimal amount) {
        Account account = accountDao.findById(accountId);
        
        // 扣减冻结金额: frozen - amount
        account.setFrozen(account.getFrozen().subtract(amount));
        
        accountDao.update(account);
    }
    
    /**
     * Cancel: 释放冻结金额
     */
    @Transactional
    public void cancelDeduct(String accountId, BigDecimal amount) {
        Account account = accountDao.findById(accountId);
        
        // 释放冻结: balance + amount, frozen - amount
        account.setBalance(account.getBalance().add(amount));
        account.setFrozen(account.getFrozen().subtract(amount));
        
        accountDao.update(account);
    }
}

TCC 事务协调器:

@Service
public class TccCoordinator {
    
    @Autowired
    private AccountTccService accountService;
    @Autowired
    private InventoryTccService inventoryService;
    @Autowired
    private OrderTccService orderService;
    
    public boolean executeTransaction(TransferRequest request) {
        String txId = UUID.randomUUID().toString();
        
        try {
            // Try 阶段: 尝试所有操作
            boolean tryResult = tryAll(txId, request);
            
            if (tryResult) {
                // Confirm 阶段: 确认所有操作
                confirmAll(txId, request);
                return true;
            } else {
                // Cancel 阶段: 取消所有操作
                cancelAll(txId, request);
                return false;
            }
        } catch (Exception e) {
            // 异常时取消
            cancelAll(txId, request);
            return false;
        }
    }
    
    private boolean tryAll(String txId, TransferRequest request) {
        // 记录事务日志
        logTccTransaction(txId, TccPhase.TRY);
        
        boolean accountTry = accountService.tryDeduct(
            request.getAccountId(), request.getAmount());
        boolean inventoryTry = inventoryService.tryReserve(
            request.getProductId(), request.getQuantity());
        boolean orderTry = orderService.tryCreate(request);
        
        return accountTry && inventoryTry && orderTry;
    }
    
    private void confirmAll(String txId, TransferRequest request) {
        logTccTransaction(txId, TccPhase.CONFIRM);
        
        accountService.confirmDeduct(request.getAccountId(), request.getAmount());
        inventoryService.confirmReserve(request.getProductId(), request.getQuantity());
        orderService.confirmCreate(request);
    }
    
    private void cancelAll(String txId, TransferRequest request) {
        logTccTransaction(txId, TccPhase.CANCEL);
        
        accountService.cancelDeduct(request.getAccountId(), request.getAmount());
        inventoryService.cancelReserve(request.getProductId(), request.getQuantity());
        orderService.cancelCreate(request);
    }
}

# 4、TCC 的三大挑战

# 4.1、空回滚问题

场景: Try 阶段因网络超时未执行,但 Cancel 被调用:

Try 请求发出 → 网络超时 → 协调器认为失败 → 调用 Cancel
但实际上 Try 根本没执行,没有资源可释放

解决: Cancel 需要检查 Try 是否执行过:

public void cancelDeduct(String txId, String accountId, BigDecimal amount) {
    // 检查事务记录
    TccLog log = tccLogDao.findByTxId(txId);
    if (log == null) {
        // Try 未执行,记录空回滚
        tccLogDao.insert(new TccLog(txId, TccPhase.CANCEL));
        return;
    }
    
    // Try 已执行,正常回滚
    Account account = accountDao.findById(accountId);
    account.setBalance(account.getBalance().add(amount));
    account.setFrozen(account.getFrozen().subtract(amount));
    accountDao.update(account);
}

# 4.2、幂等性问题

场景: 由于网络重试,Confirm 或 Cancel 被多次调用:

Confirm 第1次: frozen - 100 (正常)
Confirm 第2次: frozen - 100 (重复扣减!)

解决: 使用事务日志记录执行状态:

public void confirmDeduct(String txId, String accountId, BigDecimal amount) {
    // 检查是否已执行
    TccLog log = tccLogDao.findByTxId(txId);
    if (log != null && log.getPhase() == TccPhase.CONFIRM) {
        // 已执行过 Confirm,直接返回
        return;
    }
    
    // 执行 Confirm 逻辑
    Account account = accountDao.findById(accountId);
    account.setFrozen(account.getFrozen().subtract(amount));
    accountDao.update(account);
    
    // 记录执行状态
    tccLogDao.updatePhase(txId, TccPhase.CONFIRM);
}

# 4.3、悬挂问题

场景: Cancel 先于 Try 执行:

1. Try 请求因网络延迟未到达
2. 协调器超时,调用 Cancel (空回滚,记录日志)
3. Try 请求到达并执行 (资源被冻结)
4. 资源永久冻结,无法释放!

解决: Try 需要检查是否已 Cancel:

public boolean tryDeduct(String txId, String accountId, BigDecimal amount) {
    // 检查是否已取消
    TccLog log = tccLogDao.findByTxId(txId);
    if (log != null && log.getPhase() == TccPhase.CANCEL) {
        // 已取消,拒绝执行 Try
        return false;
    }
    
    // 正常执行 Try
    Account account = accountDao.findById(accountId);
    if (account.getBalance().compareTo(amount) < 0) {
        return false;
    }
    
    account.setBalance(account.getBalance().subtract(amount));
    account.setFrozen(account.getFrozen().add(amount));
    accountDao.update(account);
    
    // 记录 Try 执行
    tccLogDao.insert(new TccLog(txId, TccPhase.TRY));
    return true;
}

# 5、TCC 框架: Seata

Seata 是阿里开源的分布式事务解决方案,支持 TCC 模式。

核心组件:

  • TC (Transaction Coordinator): 事务协调器,维护全局事务状态
  • TM (Transaction Manager): 事务管理器,发起全局事务
  • RM (Resource Manager): 资源管理器,管理分支事务

使用示例:

// 1. 在业务发起方添加 @GlobalTransactional
@Service
public class BusinessService {
    
    @GlobalTransactional
    public void purchase(String userId, String productId, int count) {
        // 调用各个服务
        orderService.create(userId, productId, count);
        inventoryService.deduct(productId, count);
        accountService.deduct(userId, calculateAmount(productId, count));
    }
}

// 2. 在各个服务实现 TCC 接口
@LocalTCC
public interface InventoryTccAction {
    
    @TwoPhaseBusinessAction(name = "inventoryTcc", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(@BusinessActionContextParameter(paramName = "productId") String productId,
                   @BusinessActionContextParameter(paramName = "count") int count);
    
    boolean commit(BusinessActionContext context);
    
    boolean rollback(BusinessActionContext context);
}

# 六、Saga 长事务模式

# 1、Saga 基本原理

Saga 模式将长事务拆分为多个本地短事务,每个子事务都有对应的补偿操作:

T1 → T2 → T3 → T4 (正常流程)
C1 ← C2 ← C3 ← C4 (补偿流程)

核心思想: 不追求原子性,通过补偿达到最终一致性。

# 2、Saga 的两种协调模式

# 2.1、编排模式 (Choreography)

去中心化,各服务通过事件驱动自行协调:

订单服务: 创建订单 → 发布 OrderCreated 事件
   ↓
库存服务: 监听 OrderCreated → 扣减库存 → 发布 InventoryDeducted 事件
   ↓
支付服务: 监听 InventoryDeducted → 扣款 → 发布 PaymentCompleted 事件

优点: 解耦,服务自治
缺点: 流程分散,难以理解和调试

# 2.2、编排模式 (Orchestration)

中心化,由协调器统一管理流程:

Saga 协调器:
  1. 调用订单服务创建订单
  2. 调用库存服务扣减库存
  3. 调用支付服务扣款
  4. 如果任一步骤失败,按相反顺序执行补偿

优点: 流程清晰,易于管理
缺点: 协调器是单点,需要保证高可用

# 3、Saga 代码示例

Saga 协调器实现:

@Service
public class OrderSagaOrchestrator {
    
    @Autowired
    private OrderService orderService;
    @Autowired
    private InventoryService inventoryService;
    @Autowired
    private PaymentService paymentService;
    @Autowired
    private SagaLogRepository sagaLogRepo;
    
    public boolean executeOrderSaga(OrderRequest request) {
        String sagaId = UUID.randomUUID().toString();
        SagaLog sagaLog = new SagaLog(sagaId);
        
        try {
            // 步骤1: 创建订单
            String orderId = orderService.createOrder(request);
            sagaLog.addStep("createOrder", orderId);
            sagaLogRepo.save(sagaLog);
            
            // 步骤2: 扣减库存
            boolean inventoryOk = inventoryService.deduct(
                request.getProductId(), request.getQuantity());
            if (!inventoryOk) {
                throw new SagaException("库存不足");
            }
            sagaLog.addStep("deductInventory", request.getProductId());
            sagaLogRepo.save(sagaLog);
            
            // 步骤3: 扣款
            boolean paymentOk = paymentService.pay(
                request.getUserId(), request.getAmount());
            if (!paymentOk) {
                throw new SagaException("余额不足");
            }
            sagaLog.addStep("payment", request.getUserId());
            sagaLogRepo.save(sagaLog);
            
            // 所有步骤成功
            sagaLog.setStatus(SagaStatus.COMPLETED);
            sagaLogRepo.save(sagaLog);
            return true;
            
        } catch (Exception e) {
            // 执行补偿
            compensate(sagaLog);
            return false;
        }
    }
    
    private void compensate(SagaLog sagaLog) {
        // 按相反顺序执行补偿
        List<SagaStep> steps = sagaLog.getSteps();
        for (int i = steps.size() - 1; i >= 0; i--) {
            SagaStep step = steps.get(i);
            try {
                switch (step.getName()) {
                    case "createOrder":
                        orderService.cancelOrder(step.getData());
                        break;
                    case "deductInventory":
                        inventoryService.restore(step.getData(), sagaLog.getQuantity());
                        break;
                    case "payment":
                        paymentService.refund(step.getData(), sagaLog.getAmount());
                        break;
                }
                step.setCompensated(true);
            } catch (Exception e) {
                // 补偿失败,记录日志,后续人工处理
                log.error("Compensation failed for step: {}", step.getName(), e);
            }
        }
        
        sagaLog.setStatus(SagaStatus.COMPENSATED);
        sagaLogRepo.save(sagaLog);
    }
}

补偿操作实现:

@Service
public class InventoryService {
    
    @Transactional
    public boolean deduct(String productId, int quantity) {
        Inventory inventory = inventoryDao.findById(productId);
        if (inventory.getStock() < quantity) {
            return false;
        }
        
        inventory.setStock(inventory.getStock() - quantity);
        inventoryDao.update(inventory);
        return true;
    }
    
    /**
     * 补偿操作: 恢复库存
     */
    @Transactional
    public void restore(String productId, int quantity) {
        Inventory inventory = inventoryDao.findById(productId);
        inventory.setStock(inventory.getStock() + quantity);
        inventoryDao.update(inventory);
    }
}

# 4、Saga 的注意事项

# 4.1、补偿操作的设计原则

原则 说明 示例
幂等性 多次执行结果相同 使用唯一ID防止重复退款
可补偿性 每个操作都有对应补偿 扣库存 ↔ 加库存
可重试性 补偿操作失败后可重试 网络超时后重试退款
最终一致性 补偿后达到一致状态 订单取消后,库存、账户都恢复

# 4.2、补偿的局限性

不可补偿的场景:

  • 发送短信通知 (无法"取消发送")
  • 发放优惠券 (用户已使用)
  • 物理世界的操作 (打印发票)

解决思路:

  • 延迟执行: 等事务确认后再发送通知
  • 对冲操作: 发送"订单取消"通知
  • 人工介入: 记录日志,人工处理

# 5、Saga 框架: Eventuate Tram Saga

Eventuate Tram 是一个基于事件驱动的 Saga 框架。

定义 Saga:

public class OrderSaga implements SimpleSaga<OrderSagaData> {
    
    private SagaDefinition<OrderSagaData> sagaDefinition;
    
    public OrderSaga(OrderService orderService, 
                     InventoryService inventoryService,
                     PaymentService paymentService) {
        
        this.sagaDefinition = step()
            .invokeLocal(orderService::create)
            .withCompensation(orderService::cancel)
        .step()
            .invokeParticipant(inventoryService.deduct())
            .withCompensation(inventoryService.restore())
        .step()
            .invokeParticipant(paymentService.pay())
            .withCompensation(paymentService.refund())
        .build();
    }
    
    @Override
    public SagaDefinition<OrderSagaData> getSagaDefinition() {
        return sagaDefinition;
    }
}

# 七、最终一致性方案

# 1、本地消息表模式

核心思想: 利用本地事务保证业务操作和消息发送的原子性。

执行流程:

1. 开启本地事务:
   - 执行业务操作 (如创建订单)
   - 插入消息记录到消息表
   - 提交事务

2. 定时任务扫描消息表:
   - 查询未发送的消息
   - 发送到消息队列
   - 标记为已发送

3. 消费者处理消息:
   - 执行下游业务 (如扣减库存)
   - 返回 ACK

代码实现:

@Service
public class OrderService {
    
    @Autowired
    private OrderDao orderDao;
    @Autowired
    private LocalMessageDao messageDao;
    
    @Transactional
    public void createOrder(OrderRequest request) {
        // 1. 创建订单
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setStatus(OrderStatus.PENDING);
        orderDao.insert(order);
        
        // 2. 插入本地消息
        LocalMessage message = new LocalMessage();
        message.setTopic("order_created");
        message.setKey(order.getId());
        message.setContent(JSON.toJSONString(order));
        message.setStatus(MessageStatus.PENDING);
        messageDao.insert(message);
        
        // 3. 事务提交 (订单和消息同时成功或失败)
    }
}

消息发送定时任务:

@Component
public class MessagePublisher {
    
    @Autowired
    private LocalMessageDao messageDao;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Scheduled(fixedDelay = 5000) // 每5秒扫描一次
    public void publishPendingMessages() {
        List<LocalMessage> messages = messageDao.findPendingMessages(100);
        
        for (LocalMessage message : messages) {
            try {
                // 发送到消息队列
                rocketMQTemplate.convertAndSend(message.getTopic(), message.getContent());
                
                // 标记为已发送
                message.setStatus(MessageStatus.SENT);
                message.setSentTime(LocalDateTime.now());
                messageDao.update(message);
                
            } catch (Exception e) {
                // 发送失败,下次重试
                log.error("Failed to send message: {}", message.getId(), e);
            }
        }
    }
}

优点:

  • 实现简单,无需引入分布式事务框架
  • 消息不会丢失 (存储在数据库)

缺点:

  • 需要额外的消息表和定时任务
  • 消息发送有延迟

# 2、事务消息模式

RocketMQ 和 Kafka 都支持事务消息。

RocketMQ 事务消息流程:

1. 发送半消息 (Half Message):
   生产者 → RocketMQ: 发送半消息
   RocketMQ: 存储半消息,对消费者不可见

2. 执行本地事务:
   生产者: 执行本地事务 (如创建订单)

3. 提交或回滚消息:
   如果本地事务成功:
     生产者 → RocketMQ: Commit
     RocketMQ: 半消息对消费者可见
   如果本地事务失败:
     生产者 → RocketMQ: Rollback
     RocketMQ: 删除半消息

4. 事务回查 (如果长时间未提交):
   RocketMQ → 生产者: 查询本地事务状态
   生产者: 检查订单是否创建成功
   生产者 → RocketMQ: Commit/Rollback

代码实现:

@Service
public class OrderTransactionService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private OrderDao orderDao;
    
    public void createOrderWithTransactionMessage(OrderRequest request) {
        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction(
            "order_topic",
            MessageBuilder.withPayload(request).build(),
            request
        );
    }
}

@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    
    @Autowired
    private OrderService orderService;
    
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            OrderRequest request = (OrderRequest) arg;
            
            // 执行本地事务: 创建订单
            orderService.createOrder(request);
            
            // 提交消息
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            // 回滚消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    /**
     * 事务状态回查
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        try {
            OrderRequest request = JSON.parseObject(
                new String(msg.getBody()), OrderRequest.class);
            
            // 查询订单是否创建成功
            Order order = orderService.getOrder(request.getOrderId());
            
            if (order != null) {
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

消费者实现:

@Service
@RocketMQMessageListener(topic = "order_topic", consumerGroup = "inventory_consumer")
public class InventoryConsumer implements RocketMQListener<OrderRequest> {
    
    @Autowired
    private InventoryService inventoryService;
    
    @Override
    public void onMessage(OrderRequest request) {
        // 幂等性检查
        if (isDuplicate(request.getOrderId())) {
            return;
        }
        
        // 扣减库存
        inventoryService.deduct(request.getProductId(), request.getQuantity());
        
        // 记录消费记录
        recordConsumption(request.getOrderId());
    }
}

# 3、最大努力通知模式

适用场景: 对一致性要求不高,允许数据最终不一致。

核心思想: 尽最大努力发送通知,失败后定时重试,超过一定次数后放弃。

执行流程:

1. 执行本地事务
2. 发送通知 (HTTP/MQ)
3. 如果失败,记录到重试表
4. 定时任务扫描重试表:
   - 重试发送 (指数退避: 1分钟, 5分钟, 30分钟...)
   - 达到最大重试次数后,标记为失败
   - 人工介入处理

典型应用:

  • 支付回调通知
  • 短信/邮件发送
  • 第三方系统同步

代码示例:

@Service
public class PaymentNotificationService {
    
    @Autowired
    private NotificationRetryDao retryDao;
    
    public void notifyOrderPaid(String orderId) {
        try {
            // 尝试发送通知
            sendNotification(orderId);
        } catch (Exception e) {
            // 失败,记录到重试表
            NotificationRetry retry = new NotificationRetry();
            retry.setOrderId(orderId);
            retry.setRetryCount(0);
            retry.setNextRetryTime(LocalDateTime.now().plusMinutes(1));
            retryDao.insert(retry);
        }
    }
    
    @Scheduled(fixedDelay = 60000) // 每分钟执行一次
    public void retryFailedNotifications() {
        List<NotificationRetry> retries = retryDao.findPendingRetries();
        
        for (NotificationRetry retry : retries) {
            if (retry.getRetryCount() >= 10) {
                // 超过最大重试次数,标记为失败
                retry.setStatus(RetryStatus.FAILED);
                retryDao.update(retry);
                continue;
            }
            
            try {
                sendNotification(retry.getOrderId());
                
                // 成功,删除重试记录
                retryDao.delete(retry.getId());
            } catch (Exception e) {
                // 失败,增加重试次数,计算下次重试时间 (指数退避)
                retry.setRetryCount(retry.getRetryCount() + 1);
                int delayMinutes = (int) Math.pow(2, retry.getRetryCount());
                retry.setNextRetryTime(LocalDateTime.now().plusMinutes(delayMinutes));
                retryDao.update(retry);
            }
        }
    }
}

# 八、分布式事务实战经验

# 1、技术选型决策树

1. 是否要求强一致性?
   ├─ 是 → 2PC/3PC (仅限小规模系统)
   └─ 否 → 继续判断

2. 业务是否可补偿?
   ├─ 是 → TCC 或 Saga
   │   ├─ 需要严格的中间状态 → TCC
   │   └─ 流程较长,允许最终一致 → Saga
   └─ 否 → 继续判断

3. 是否可以异步处理?
   ├─ 是 → 消息队列 + 最终一致性
   │   ├─ 对可靠性要求高 → 事务消息
   │   └─ 允许少量丢失 → 最大努力通知
   └─ 否 → 考虑业务拆分或重新设计

# 2、常见场景的方案选择

业务场景 推荐方案 理由
电商下单 Saga + 消息队列 流程长,允许最终一致,性能要求高
支付转账 TCC 涉及资金,需要严格的中间状态控制
库存扣减 TCC + 预扣 防止超卖,需要原子性操作
积分增加 消息队列 允许延迟,对一致性要求不高
订单状态同步 事务消息 需要保证消息可靠性,但允许延迟
第三方回调 最大努力通知 第三方系统不可控,尽力而为

# 3、分布式事务的最佳实践

# 3.1、能不用就不用

优先考虑:

  • 业务拆分: 将强一致性需求拆分到单个服务内
  • 异步化: 将同步调用改为异步消息
  • 最终一致性: 接受数据短暂不一致

反例:

// 不好的设计: 一个事务跨5个服务
@GlobalTransactional
public void complexOperation() {
    serviceA.method();
    serviceB.method();
    serviceC.method();
    serviceD.method();
    serviceE.method();
}

改进:

// 好的设计: 核心操作在一个服务内,其他异步处理
@Transactional
public void coreOperation() {
    // 核心业务在本地事务内完成
    orderDao.create(order);
    orderItemDao.batchInsert(items);
    
    // 发送事件,异步处理其他操作
    eventPublisher.publish(new OrderCreatedEvent(order));
}

# 3.2、幂等性设计

所有分布式事务操作都必须保证幂等性。

实现方式:

1. 唯一ID + 状态检查:

@Transactional
public void processPayment(String paymentId, BigDecimal amount) {
    // 检查是否已处理
    Payment payment = paymentDao.findById(paymentId);
    if (payment != null && payment.getStatus() == PaymentStatus.SUCCESS) {
        return; // 已处理,直接返回
    }
    
    // 执行支付逻辑
    // ...
}

2. 去重表:

@Transactional
public void deductInventory(String requestId, String productId, int quantity) {
    // 检查请求是否已处理
    if (deduplicationDao.exists(requestId)) {
        return;
    }
    
    // 插入去重记录
    deduplicationDao.insert(requestId);
    
    // 执行库存扣减
    inventoryDao.deduct(productId, quantity);
}

3. 乐观锁:

@Transactional
public void updateInventory(String productId, int quantity, int version) {
    int updated = inventoryDao.updateWithVersion(productId, quantity, version);
    if (updated == 0) {
        throw new ConcurrentModificationException("库存已被修改");
    }
}

# 3.3、超时和重试策略

设置合理的超时时间:

@GlobalTransactional(timeoutMills = 60000) // 1分钟超时
public void longRunningTransaction() {
    // ...
}

重试策略:

@Retryable(
    value = {NetworkException.class},
    maxAttempts = 3,
    backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void remoteCall() {
    // 网络调用,失败后重试
}

重试注意事项:

  • 只重试临时性错误 (网络超时、服务暂不可用)
  • 不重试业务错误 (余额不足、库存不足)
  • 必须保证幂等性

# 3.4、监控和告警

关键指标:

指标 说明 告警阈值
事务成功率 提交成功的事务占比 < 95%
事务耗时 事务从开始到结束的时间 P99 > 5s
补偿率 需要补偿的事务占比 > 5%
消息积压 未消费的消息数量 > 10000
重试次数 平均重试次数 > 3

日志记录:

@Aspect
@Component
public class TransactionLogAspect {
    
    @Around("@annotation(globalTransactional)")
    public Object logTransaction(ProceedingJoinPoint pjp, 
                                GlobalTransactional globalTransactional) throws Throwable {
        String txId = RootContext.getXID();
        long startTime = System.currentTimeMillis();
        
        try {
            Object result = pjp.proceed();
            
            long duration = System.currentTimeMillis() - startTime;
            log.info("Transaction success: txId={}, duration={}ms", txId, duration);
            
            return result;
        } catch (Exception e) {
            long duration = System.currentTimeMillis() - startTime;
            log.error("Transaction failed: txId={}, duration={}ms", txId, duration, e);
            throw e;
        }
    }
}

# 4、常见问题和解决方案

# 4.1、问题: 分布式事务性能低

原因:

  • 多次网络调用
  • 资源锁定时间长
  • 同步等待

解决方案:

  • 减少分布式事务的使用范围
  • 使用异步消息代替同步调用
  • 采用 AP 模式 (TCC/Saga) 代替 CP 模式 (2PC)

# 4.2、问题: 事务悬挂或长时间未完成

原因:

  • 协调器故障
  • 参与者超时
  • 网络分区

解决方案:

  • 设置全局超时时间
  • 实现事务恢复机制
  • 定时扫描长时间未完成的事务
@Scheduled(fixedDelay = 300000) // 每5分钟执行一次
public void cleanupZombieTransactions() {
    // 查询超过30分钟未完成的事务
    List<Transaction> zombies = transactionDao.findZombieTransactions(30);
    
    for (Transaction tx : zombies) {
        try {
            // 尝试回滚
            transactionManager.rollback(tx.getId());
        } catch (Exception e) {
            // 记录日志,人工介入
            log.error("Failed to cleanup zombie transaction: {}", tx.getId(), e);
        }
    }
}

# 4.3、问题: 消息重复消费

原因:

  • 消费者确认失败
  • 网络波动导致重传
  • 消息队列重平衡

解决方案: 实现消费幂等

@Component
public class IdempotentMessageConsumer {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    public void consume(Message message) {
        String messageId = message.getMessageId();
        
        // 使用 Redis 去重,设置过期时间
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(messageId, "1", 24, TimeUnit.HOURS);
        
        if (Boolean.FALSE.equals(success)) {
            // 已消费过,跳过
            return;
        }
        
        // 处理消息
        processMessage(message);
    }
}

# 九、总结与展望

# 1、核心要点回顾

方案 一致性 可用性 性能 复杂度 适用场景
2PC/3PC 强一致 低 低 高 小规模、强一致需求
TCC 最终一致 中 中 高 金融、交易、库存
Saga 最终一致 高 高 中 长流程、可补偿业务
消息队列 最终一致 高 高 低 异步、允许延迟
最大努力通知 弱一致 高 高 低 第三方通知、不重要数据

# 2、设计原则

  1. 能不用分布式事务就不用: 优先考虑业务拆分和最终一致性
  2. 选择合适的一致性级别: 不是所有场景都需要强一致性
  3. 保证幂等性: 所有操作必须支持重试
  4. 做好监控和告警: 及时发现和处理问题
  5. 设计降级方案: 关键路径要有人工介入机制

# 3、未来发展方向

  • Serverless 事务: 云原生环境下的分布式事务
  • AI 辅助决策: 自动选择最优的事务方案
  • 跨链事务: 区块链领域的分布式事务
  • 边缘计算事务: 边缘节点之间的事务协调

# 4、学习建议

  1. 理论结合实践: 在实际项目中应用,加深理解
  2. 研究开源框架: 深入学习 Seata、Eventuate 等框架源码
  3. 关注故障案例: 从大厂的事故中学习经验教训
  4. 持续跟进技术: 分布式事务领域不断演进

分布式事务没有银弹,只有最适合的方案。在实际工作中,需要根据业务特点、团队能力、系统规模等因素,综合权衡,做出最佳选择。

祝你变得更强!

编辑 (opens new window)
#分布式事务#高可用
上次更新: 2025/12/17
高可用-混沌工程实践
高可用-多活与容灾架构设计

← 高可用-混沌工程实践 高可用-多活与容灾架构设计→

最近更新
01
AI编程时代的一些心得
09-11
02
Claude Code与Codex的协同工作
09-01
03
Claude Code 最佳实践(个人版)
08-01
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式