跳至主要內容

分布式事务实战:Saga、TCC、AT 模式原理与选型

郑天祺大约 14 分钟分布式SagaTCC

前言

传统单体应用中,数据库事务通过 ACID 保证一致性。但在微服务架构下,一个业务操作涉及多个服务、多个数据库,本地事务已无能为力

以电商订单为例:

用户下单 → 订单服务(创建订单)
        → 库存服务(扣减库存)
        → 支付服务(扣款)
        → 积分服务(加积分)

任何一个环节失败,都需要回滚整个流程,但这四个服务各自管理数据库!

本文深入剖析分布式事务的三大解决方案:Saga、TCC、AT 各自的原理、优缺点与生产落地。


一、分布式事务的挑战

1.1 CAP定理下的妥协

分布式系统无法同时满足:

  • C (Consistency) 一致性
  • A (Availability) 可用性
  • P (Partition Tolerance) 分区容错

P(网络分区)必然存在 的前提下,需在 C 和 A 之间选择:

传统事务系统:CP选择
├─ 强一致性(ACID)
├─ 可能不可用(等待锁)
└─ 适用于:金融转账、库存精确扣减

微服务系统:AP选择  
├─ 最终一致性(BASE)
├─ 高可用(无锁,异步)
└─ 适用于:订单、积分、日志

1.2 BASE理论

基本可用 + 软状态 + 最终一致

T0: 订单创建成功 ✓
    库存扣减 中间态 (处理中)
T1: 网络抖动,扣减失败
    触发补偿 → 库存加回
T2: 状态同步完成
    最终一致 ✓

二、Saga 模式:编程式事务

2.1 原理

Saga = 一系列本地事务 + 补偿动作

不要求跨服务的强一致性,而是通过 顺序执行 + 失败补偿 达到最终一致。

正向流程:
Order Service → (创建订单) ✓
             → Inventory Service → (扣库存) ✓
                                → Payment Service → (扣款) ✓
                                                  → Point Service → (加积分) ✓

反向流程(失败时):
Point Service → (积分回滚)
             ← Payment Service (款项退回)
                            ← Inventory Service (库存加回)
                                               ← Order Service (订单取消)

2.2 两种实现方式

方式1:编排型Saga(Orchestration)

中央协调器 指挥各服务执行顺序

┌─────────────────┐
│  Saga Orchestrator
│  (状态机)       │
└────────┬────────┘
         │
    ┌────┼────┬────────┬─────────┐
    ▼    ▼    ▼        ▼         ▼
Order Invent Payment Point    Notify
Service ory   Service Service  Service

实现示例(使用Temporal框架):

from temporalio import workflow
from temporalio.client import Client
import asyncio

@workflow.defn
class OrderSaga:
    @workflow.run
    async def execute(self, order_id, items):
        # 第一步:创建订单
        order = await workflow.execute_activity(
            create_order_activity,
            order_id, items
        )
        
        try:
            # 第二步:扣库存
            await workflow.execute_activity(
                deduct_inventory_activity,
                order_id, items
            )
            
            # 第三步:扣款
            payment = await workflow.execute_activity(
                process_payment_activity,
                order_id, order.total_price
            )
            
            # 第四步:加积分
            await workflow.execute_activity(
                add_points_activity,
                order_id, order.user_id, order.total_price
            )
            
            # 全部成功
            return {"status": "SUCCESS", "order": order, "payment": payment}
        
        except Exception as e:
            # 失败,执行补偿
            await workflow.execute_activity(
                refund_payment_activity,
                order_id, payment.id
            )
            
            await workflow.execute_activity(
                return_inventory_activity,
                order_id, items
            )
            
            await workflow.execute_activity(
                cancel_order_activity,
                order_id
            )
            
            raise

# 服务实现
@activity.defn
async def create_order_activity(order_id, items):
    return await order_service.create(order_id, items)

@activity.defn
async def deduct_inventory_activity(order_id, items):
    return await inventory_service.deduct(items)

@activity.defn
async def process_payment_activity(order_id, amount):
    return await payment_service.pay(order_id, amount)

@activity.defn
async def add_points_activity(order_id, user_id, amount):
    return await point_service.add(user_id, int(amount * 0.01))

# 补偿活动
@activity.defn
async def refund_payment_activity(order_id, payment_id):
    return await payment_service.refund(payment_id)

@activity.defn
async def return_inventory_activity(order_id, items):
    return await inventory_service.return_items(items)

@activity.defn
async def cancel_order_activity(order_id):
    return await order_service.cancel(order_id)

流程图

创建订单 ✓
   │
   ├→ [扣库存] ✓
   │      │
   │      ├→ [扣款] ✓
   │      │      │
   │      │      ├→ [加积分] ✓
   │      │      │      │
   │      │      │      └→ 返回成功
   │      │      │
   │      │      └→ [扣款失败] ✗
   │      │             │
   │      │             └→ 退款 ← [扣款补偿]
   │      │                │
   │      │                └→ 库存加回 ← [库存补偿]
   │      │                   │
   │      │                   └→ 订单取消 ← [订单补偿]
   │      │
   │      └→ [扣库存失败] ✗
   │             │
   │             └→ 订单取消 ← [订单补偿]

优点

  • 流程清晰可控
  • 中央协调器维护状态,便于监控
  • 支持复杂业务逻辑(分支、循环)

缺点

  • 中央协调器成为性能瓶颈
  • 强依赖协调器可用性
  • 服务间耦合度高(都要知道协调器)

方式2:编程式Saga(Choreography)

服务间通过 消息驱动 自主协调,无中央协调器

Order Service (发布事件)
    │
    └→ OrderCreated
       └─→ Inventory Service (订阅,执行动作)
           │
           └→ InventoryDeducted
              └─→ Payment Service
                  │
                  └→ PaymentProcessed
                     └─→ Point Service
                         │
                         └→ PointAdded (完成)
                         
           或 InventoryDeductFailed
              └─→ Order Service (补偿)
                  └→ OrderCancelled

实现示例(使用Spring Cloud Stream + RabbitMQ):

from fastapi import FastAPI
import aio_pika
import json

app = FastAPI()

# Order Service
class OrderService:
    def __init__(self, mq_connection):
        self.mq = mq_connection
    
    async def create_order(self, order_data):
        # 1. 创建订单
        order = await db.orders.insert(order_data)
        
        # 2. 发布事件
        await self.publish_event("order.created", {
            "order_id": order.id,
            "items": order_data["items"],
            "user_id": order_data["user_id"]
        })
        
        return order
    
    async def cancel_order(self, order_id):
        # 补偿:取消订单
        await db.orders.update(order_id, status="CANCELLED")
    
    async def publish_event(self, event_type, data):
        channel = await self.mq.get_channel()
        exchange = await channel.declare_exchange(
            'order.events', 
            aio_pika.ExchangeType.TOPIC,
            durable=True
        )
        
        message = aio_pika.Message(
            body=json.dumps({
                "event_type": event_type,
                "data": data
            }).encode(),
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT
        )
        
        await exchange.publish(message, event_type)

# Inventory Service
class InventoryService:
    def __init__(self, mq_connection):
        self.mq = mq_connection
    
    async def listen_order_events(self):
        channel = await self.mq.get_channel()
        exchange = await channel.declare_exchange(
            'order.events',
            aio_pika.ExchangeType.TOPIC,
            durable=True
        )
        
        queue = await channel.declare_queue('inventory.queue', durable=True)
        await queue.bind(exchange, 'order.created')
        
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    event = json.loads(message.body.decode())
                    
                    if event['event_type'] == 'order.created':
                        try:
                            # 扣库存
                            await self.deduct_inventory(event['data'])
                            
                            # 发布成功事件
                            await self.publish_event('inventory.deducted', {
                                "order_id": event['data']['order_id']
                            })
                        except Exception as e:
                            # 发布失败事件
                            await self.publish_event('inventory.deduct_failed', {
                                "order_id": event['data']['order_id'],
                                "reason": str(e)
                            })
    
    async def deduct_inventory(self, order_data):
        for item in order_data['items']:
            await db.inventory.update(
                item['product_id'],
                decrement_qty=item['qty']
            )
    
    async def publish_event(self, event_type, data):
        # 与OrderService类似的逻辑
        pass

# Payment Service
class PaymentService:
    async def listen_inventory_events(self):
        # 监听 inventory.deducted 事件
        # 执行扣款
        # 发布 payment.processed 或 payment.failed
        pass

# Point Service
class PointService:
    async def listen_payment_events(self):
        # 监听 payment.processed 事件
        # 加积分
        pass
    
    async def listen_refund_events(self):
        # 监听 payment.refunded 事件
        # 积分回滚
        pass

流程示意

User → Order Service
         │
         ├→ DB: 创建订单 ✓
         │
         └→ MQ: 发布 OrderCreated
            │
            ├→ Inventory Service 收到
            │  ├→ DB: 扣库存 ✓
            │  └→ MQ: 发布 InventoryDeducted
            │     │
            │     ├→ Payment Service 收到
            │     │  ├→ API: 调用支付网关 ✓
            │     │  └→ MQ: 发布 PaymentProcessed
            │     │     │
            │     │     └→ Point Service 收到
            │     │        ├→ DB: 加积分 ✓
            │     │        └→ MQ: 发布 PointAdded (完成)
            │     │
            │     └→ (如果失败) MQ: 发布 InventoryDeductFailed
            │        │
            │        └→ Order Service 收到
            │           ├→ DB: 取消订单
            │           └→ 通知用户

优点

  • 服务解耦,松散耦合
  • 可扩展性强(新服务只需订阅相关事件)
  • 高可用(无中央协调器)

缺点

  • 流程难以追踪(事件链复杂)
  • 调试困难(需要追踪消息轨迹)
  • 消息丢失风险(需保证消息可靠性)

2.3 Saga的局限性

问题1:脏读

T0: Order Service 创建订单 (status=PENDING)
T1: Inventory Service 扣库存
T2: 用户查询订单 → 看到 PENDING (实际上库存可能扣不了!)
T3: Payment Service 扣款失败,触发补偿
T4: 订单取消,但用户已看到PENDING → 心理不适

解决:订单状态机更精细化

订单状态:
PENDING_PAYMENT (支付中)
    ↓
PAID (已支付,可见给用户)
    ↓
FULFILLED (已履约)
或
CANCELLED (已取消,显示理由)

问题2:补偿失败

支付扣款成功
Payment Service → 发起退款 ✓
Inventory Service → 库存加回失败 ✗
                   重试仍失败(例如库存服务宕机)
                   
结果:款已退,库存未加,业务数据不一致!

解决

  1. 最终一致性保证:后台定时任务扫描异常状态,重试补偿
  2. 人工介入:告警+人工处理
  3. 上层补偿:库存加回失败 → 直接赔偿用户积分/优惠券

三、TCC 模式:强一致性

3.1 原理

Try-Confirm-Cancel,比Saga提供更强的一致性保证

Try阶段:预留资源
├─ Order Service: 锁定订单
├─ Inventory Service: 锁定库存
├─ Payment Service: 冻结账户余额
└─ Point Service: 预留积分额度

Confirm阶段:真正执行
├─ Order Service: 确认订单
├─ Inventory Service: 扣减库存
├─ Payment Service: 扣款
└─ Point Service: 加积分

Cancel阶段:释放资源(仅当任何一个Try或Confirm失败)
├─ Order Service: 释放订单锁
├─ Inventory Service: 释放库存锁
├─ Payment Service: 释放冻结余额
└─ Point Service: 释放积分额度

特点

  • 强一致性:所有服务要么全部成功,要么全部失败
  • 无脏读:Try阶段只锁定资源,不给用户可见
  • 业务逻辑清晰

3.2 实现示例

服务接口定义

from abc import ABC, abstractmethod
from enum import Enum

class TransactionStatus(Enum):
    INITIAL = "INITIAL"
    TRYING = "TRYING"
    TRIED = "TRIED"
    CONFIRMED = "CONFIRMED"
    CANCELLED = "CANCELLED"

class TccService(ABC):
    """TCC服务接口"""
    
    @abstractmethod
    async def try_operation(self, transaction_id: str, data: dict) -> bool:
        """
        Try阶段:预留资源,但不实际修改数据
        返回True表示可以继续,False表示无法进行
        """
        pass
    
    @abstractmethod
    async def confirm_operation(self, transaction_id: str) -> bool:
        """Confirm阶段:真正执行业务逻辑"""
        pass
    
    @abstractmethod
    async def cancel_operation(self, transaction_id: str) -> bool:
        """Cancel阶段:释放Try阶段预留的资源"""
        pass

订单服务实现

class OrderServiceImpl(TccService):
    async def try_operation(self, transaction_id: str, data: dict) -> bool:
        """
        Try: 创建订单,但status=PENDING(锁定状态)
        """
        # 检查用户、商品有效性
        user = await db.users.find(data['user_id'])
        if not user:
            return False
        
        # 创建待支付订单
        order = await db.orders.insert({
            "transaction_id": transaction_id,
            "user_id": data['user_id'],
            "items": data['items'],
            "status": "PENDING",  # 尚未确认
            "total_price": sum(item['price'] * item['qty'] for item in data['items']),
            "created_at": datetime.now()
        })
        
        # 存储Try阶段的临时状态(便于Cancel回滚)
        await db.tcc_logs.insert({
            "transaction_id": transaction_id,
            "service": "OrderService",
            "phase": "TRIED",
            "resource_id": order.id,
            "data": order.to_dict()
        })
        
        return True
    
    async def confirm_operation(self, transaction_id: str) -> bool:
        """
        Confirm: 确认订单,status=CONFIRMED
        """
        order = await db.orders.find_one({"transaction_id": transaction_id})
        
        if not order:
            return False
        
        # 更新订单状态
        await db.orders.update(order.id, status="CONFIRMED")
        
        # 记录Confirm日志
        await db.tcc_logs.insert({
            "transaction_id": transaction_id,
            "service": "OrderService",
            "phase": "CONFIRMED",
            "resource_id": order.id
        })
        
        return True
    
    async def cancel_operation(self, transaction_id: str) -> bool:
        """
        Cancel: 取消订单,回到INITIAL
        """
        order = await db.orders.find_one({"transaction_id": transaction_id})
        
        if not order:
            return True  # 订单本不存在,视为成功
        
        # 软删除或更新状态
        await db.orders.update(order.id, status="CANCELLED")
        
        # 记录Cancel日志
        await db.tcc_logs.insert({
            "transaction_id": transaction_id,
            "service": "OrderService",
            "phase": "CANCELLED",
            "resource_id": order.id
        })
        
        return True

库存服务实现

class InventoryServiceImpl(TccService):
    async def try_operation(self, transaction_id: str, data: dict) -> bool:
        """
        Try: 锁定库存(创建锁定记录,但不实际扣减)
        """
        lock_records = []
        
        for item in data['items']:
            product = await db.products.find(item['product_id'])
            
            if not product or product.stock < item['qty']:
                # Try失败,无法预留库存
                return False
            
            # 创建库存锁定记录
            lock = await db.inventory_locks.insert({
                "transaction_id": transaction_id,
                "product_id": item['product_id'],
                "locked_qty": item['qty'],
                "created_at": datetime.now()
            })
            lock_records.append(lock.id)
        
        # 记录Try日志
        await db.tcc_logs.insert({
            "transaction_id": transaction_id,
            "service": "InventoryService",
            "phase": "TRIED",
            "resource_ids": lock_records,
            "data": data['items']
        })
        
        return True
    
    async def confirm_operation(self, transaction_id: str) -> bool:
        """
        Confirm: 真正扣减库存
        """
        locks = await db.inventory_locks.find({"transaction_id": transaction_id})
        
        if not locks:
            return False
        
        for lock in locks:
            # 扣减实际库存
            await db.products.update(
                lock.product_id,
                decrement_stock=lock.locked_qty
            )
            
            # 删除锁定记录
            await db.inventory_locks.delete(lock.id)
        
        # 记录Confirm日志
        await db.tcc_logs.insert({
            "transaction_id": transaction_id,
            "service": "InventoryService",
            "phase": "CONFIRMED"
        })
        
        return True
    
    async def cancel_operation(self, transaction_id: str) -> bool:
        """
        Cancel: 释放库存锁定
        """
        locks = await db.inventory_locks.find({"transaction_id": transaction_id})
        
        for lock in locks:
            await db.inventory_locks.delete(lock.id)
        
        # 记录Cancel日志
        await db.tcc_logs.insert({
            "transaction_id": transaction_id,
            "service": "InventoryService",
            "phase": "CANCELLED"
        })
        
        return True

TCC协调器

import uuid
from typing import List

class TccCoordinator:
    def __init__(self, services: List[TccService]):
        self.services = {svc.__class__.__name__: svc for svc in services}
    
    async def execute_transaction(self, transaction_data: dict) -> bool:
        """
        执行分布式事务
        返回True: 全部成功
        返回False: 某个环节失败,已自动回滚
        """
        transaction_id = str(uuid.uuid4())
        tried_services = []
        
        try:
            # Try阶段:所有服务都成功才继续
            for service_name, service in self.services.items():
                try:
                    success = await service.try_operation(transaction_id, transaction_data)
                    if not success:
                        raise Exception(f"{service_name} try_operation failed")
                    tried_services.append(service_name)
                except Exception as e:
                    print(f"Try phase failed for {service_name}: {e}")
                    raise
            
            # Confirm阶段:依次确认
            for service_name, service in self.services.items():
                try:
                    success = await service.confirm_operation(transaction_id)
                    if not success:
                        raise Exception(f"{service_name} confirm_operation failed")
                except Exception as e:
                    print(f"Confirm phase failed for {service_name}: {e}")
                    raise
            
            print(f"Transaction {transaction_id} completed successfully")
            return True
        
        except Exception as e:
            print(f"Transaction {transaction_id} failed, initiating cancel phase")
            
            # Cancel阶段:回滚已Try成功的服务
            for service_name in tried_services:
                try:
                    service = self.services[service_name]
                    await service.cancel_operation(transaction_id)
                except Exception as cancel_error:
                    print(f"Cancel failed for {service_name}: {cancel_error}")
                    # 继续尝试其他服务的Cancel(不因一个失败而中止)
            
            return False

使用示例

# 初始化
order_service = OrderServiceImpl()
inventory_service = InventoryServiceImpl()
payment_service = PaymentServiceImpl()
point_service = PointServiceImpl()

coordinator = TccCoordinator([
    order_service,
    inventory_service,
    payment_service,
    point_service
])

# 执行事务
success = await coordinator.execute_transaction({
    "user_id": "user123",
    "items": [
        {"product_id": "prod1", "qty": 2, "price": 100},
        {"product_id": "prod2", "qty": 1, "price": 50}
    ]
})

if success:
    print("订单创建成功")
else:
    print("订单创建失败,已自动回滚")

3.3 TCC的局限性

问题1:业务复杂性

每个操作都需实现Try、Confirm、Cancel三个方法,编码量大

问题2:资源锁定时间

T0: Try阶段开始,锁定资源
T1: Try阶段完成
T2: Confirm/Cancel阶段执行(可能很慢)
    ↑ 资源被锁定,其他请求等待
    
如果Confirm阶段需要5秒,则资源被锁5秒
高并发下 → 锁竞争激烈 → QPS下降

问题3:Confirm失败无补偿

Try成功 → Confirm阶段 → 数据库宕机
         → Confirm失败
         → Cancel阶段 → 但此时资源已部分修改
         
需要人工介入恢复数据

四、AT模式:自动补偿

4.1 原理

Automatic Compensation Transaction,由Seata框架推行

核心思想:代理本地事务,记录操作前后的数据快照,
实现自动补偿(无需编写补偿代码)

架构

┌──────────────────────────────────────────────────┐
│  应用层(业务逻辑)                               │
│  @Transactional @GlobalTransactional            │
└───────────────────┬────────────────────────────┘
                    │
┌───────────────────▼──────────────────────────────┐
│  Seata代理层(拦截本地事务)                     │
│  1. 记录Before Image (修改前数据)                │
│  2. 执行本地SQL操作                              │
│  3. 记录After Image (修改后数据)                 │
│  4. 上报Undo Log                                │
└───────────────────┬──────────────────────────────┘
                    │
┌───────────────────▼──────────────────────────────┐
│  事务协调器(TC)                                 │
│  - 全局事务编号生成                              │
│  - 全局锁管理                                    │
│  - 补偿恢复                                      │
└──────────────────────────────────────────────────┘

4.2 实现示例(Seata框架)

Seata配置

# seata-server 配置
server:
  port: 8091
  servlet:
    context-path: /seata

spring:
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata?serverTimezone=UTC
    username: root
    password: password

seata:
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: public
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: public
  store:
    mode: db  # 使用数据库存储事务信息
    db:
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/seata?serverTimezone=UTC
      username: root
      password: password

创建Undo Log表

CREATE TABLE IF NOT EXISTS `undo_log` (
  `branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(8000) COMMENT 'undo_log context',
  `rollback_info` longblob NOT NULL COMMENT 'serialized rollback info',
  `log_status` int(11) NOT NULL COMMENT '0:normal,1:defense',
  `log_created` datetime NOT NULL COMMENT 'create datetime',
  `log_modified` datetime NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='AT transaction mode undo table';

订单服务

from fastapi import FastAPI
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
from sqlalchemy.orm import sessionmaker
import seata_python_client
import uuid
from datetime import datetime

app = FastAPI()

# 初始化Seata客户端
seata_client = seata_python_client.SeataClient(
    registry_type="nacos",
    registry_address="127.0.0.1:8848",
    application_name="order-service"
)

# 数据库
engine = create_engine("mysql+pymysql://root:password@localhost/order_db")
Session = sessionmaker(bind=engine)

class Order:
    def __init__(self, user_id, total_price, status="PENDING"):
        self.id = str(uuid.uuid4())
        self.user_id = user_id
        self.total_price = total_price
        self.status = status
        self.created_at = datetime.now()

@app.post("/order/create")
async def create_order(order_data: dict):
    """
    创建订单 - 全局事务方法
    使用@GlobalTransactional标注,Seata自动处理
    """
    # 获取全局事务ID
    xid = seata_client.get_global_tx_id()
    
    session = Session()
    
    try:
        # 1. 创建订单(本地事务)
        order = Order(
            user_id=order_data['user_id'],
            total_price=order_data['total_price']
        )
        session.add(order)
        session.commit()
        
        # Seata自动记录了Before Image和After Image
        # Before Image: 无此记录
        # After Image: 新插入的记录
        
        # 2. 调用库存服务
        inventory_result = await deduct_inventory_rpc(
            xid=xid,
            items=order_data['items']
        )
        
        if not inventory_result['success']:
            raise Exception("库存不足")
        
        # 3. 调用支付服务
        payment_result = await process_payment_rpc(
            xid=xid,
            order_id=order.id,
            amount=order.total_price
        )
        
        if not payment_result['success']:
            raise Exception("支付失败")
        
        # 4. 更新订单状态
        order.status = "PAID"
        session.commit()  # Seata继续记录修改
        
        # 5. 调用积分服务
        await add_points_rpc(
            xid=xid,
            user_id=order_data['user_id'],
            points=int(order.total_price * 0.01)
        )
        
        return {"success": True, "order_id": order.id}
    
    except Exception as e:
        # Seata自动触发全局回滚
        # 所有服务的操作都会使用Undo Log恢复
        session.rollback()
        seata_client.report_transaction_status(xid, False)
        return {"success": False, "error": str(e)}
    
    finally:
        session.close()

async def deduct_inventory_rpc(xid: str, items: list):
    """
    远程调用库存服务
    """
    headers = {
        "X-SEATA-XID": xid  # 传递全局事务ID
    }
    # 实际HTTP/RPC调用
    response = await http_client.post(
        "http://inventory-service/inventory/deduct",
        json={"items": items},
        headers=headers
    )
    return response.json()

async def process_payment_rpc(xid: str, order_id: str, amount: float):
    """
    远程调用支付服务
    """
    headers = {"X-SEATA-XID": xid}
    response = await http_client.post(
        "http://payment-service/payment/process",
        json={"order_id": order_id, "amount": amount},
        headers=headers
    )
    return response.json()

async def add_points_rpc(xid: str, user_id: str, points: int):
    """
    远程调用积分服务
    """
    headers = {"X-SEATA-XID": xid}
    response = await http_client.post(
        "http://point-service/point/add",
        json={"user_id": user_id, "points": points},
        headers=headers
    )
    return response.json()

库存服务(参与者)

@app.post("/inventory/deduct")
async def deduct_inventory(items: list):
    """
    库存服务 - 分支事务方法
    需要@Transactional注解,Seata自动代理
    """
    xid = request.headers.get("X-SEATA-XID")
    
    session = Session()
    
    try:
        for item in items:
            # 查询前快照
            product = session.query(Product).filter(
                Product.id == item['product_id']
            ).first()
            
            if not product or product.stock < item['qty']:
                raise Exception("库存不足")
            
            # Before Image记录
            before_image = {
                "id": product.id,
                "stock": product.stock
            }
            
            # 扣库存
            product.stock -= item['qty']
            session.commit()
            
            # After Image记录(Seata自动)
            # 当事务回滚时,使用Before Image恢复
            
        return {"success": True}
    
    except Exception as e:
        session.rollback()
        # Seata会自动调用此服务的Undo Log恢复
        return {"success": False, "error": str(e)}
    
    finally:
        session.close()

4.3 AT模式的优势

特性ATTCCSaga
一致性强一致强一致最终一致
编码复杂度低(无需补偿代码)高(3个方法)中(需事件链)
资源锁定时间短(仅本地事务)长(Try-Confirm)无(异步)
业务侵入度低(代理模式)高(需改业务逻辑)中(需事件驱动)
性能低(锁竞争)高(无同步锁)

五、三大模式对比与选型

5.1 决策矩阵

┌────────────────────────────────────────────────┐
│        需要强一致性吗?                         │
└─────────┬──────────────────────────┬──────────┘
         否                          是
         │                           │
         │                    ┌──────▼────────┐
         │                    │ 数据修改频繁  │
         │                    │ 还是偶发?    │
         │                    └──┬──────┬─────┘
         │                    频繁 偶发
         │                      │    │
         │                    ┌─▼─┐┌─▼──┐
         │                    │AT ││TCC │
         │                    └───┘└────┘
         │
    ┌────▼─────────┐
    │实时性要求高  │
    │还是可异步?  │
    └┬──────────┬─┘
  实时│          │异步
    ┌▼┐        ┌▼──┐
    │?│        │Saga│
    └─┘        └────┘

5.2 具体建议

场景推荐方案原因
电商订单AT数据修改频繁,需强一致,但不涉及跨库关联
转账/扣费TCC金融级强一致,接受短期资源锁定
物流追踪Saga最终一致即可,不需要立即同步
数据分析Saga异步处理,高吞吐量
库存扣减AT频繁操作,需精确性
用户积分Saga可异步加积分,失败可补偿

总结

分布式事务没有银弹,需根据业务特性选择:

  1. Saga:最终一致,高性能,适合大多数互联网业务
  2. TCC:强一致,低吞吐,适合金融转账等核心链路
  3. AT:强一致+低编码,适合使用Seata框架的云原生应用

黄金法则

  • 优先考虑业务流程优化,减少分布式事务需求
  • 能用本地事务解决,就不用分布式事务
  • 优先选Saga(最终一致),实在不行再上TCC/AT
上次编辑于:
贡献者: zhengtianqi