跳至主要內容

CQRS与Event_Sourcing:事件驱动架构的实践指南

郑天祺大约 7 分钟架构设计CQRSEvent_Sourcing事件驱动架构

前言

传统应用中,读写操作使用同一个数据模型

User Table:
├─ id
├─ name
├─ email
├─ created_at
├─ updated_at
└─ ...

所有写操作:UPDATE user SET ...
所有读操作:SELECT * FROM user WHERE ...

问题

  • 读写使用同一个表,导致表结构臃肿(既要满足写入需求,也要满足读取需求)
  • 某些读操作需要JOIN多个表,性能低
  • 某些字段只用于特定的读操作,浪费存储空间

CQRS(Command Query Responsibility Segregation) 的思想是:

将读写分离
  ├─ 写(Command):更新事件日志
  └─ 读(Query):查询优化的读模型

本文深入讲解CQRS和Event Sourcing的原理、实现与权衡。


一、CQRS基本概念

1.1 传统CRUD vs CQRS

传统CRUD:一个数据模型

┌──────────────┐
│   User表     │
│  id|name|... │
└──────────────┘
    ↑      ↑
  写操作  读操作
    │      │
┌───┴──┬───┴──┐
│写入  │读取   │
│      │      │
UPDATE SELECT

CQRS:分离的命令和查询模型

写端:
┌────────────────┐
│  User Command  │ ← 优化写入
│  (Minimal Data)│
└────────────────┘
    │ 发布事件
    ▼
┌────────────────┐
│  Event Log     │ ← 永久记录
│ (History)      │
└────────────────┘

读端:
┌────────────────┐
│  User Query    │ ← 优化读取
│  (Denormalized)│
│  id|name|...   │
└────────────────┘
    ↑ 异步更新
    │ 消费事件

1.2 CQRS的核心思想

# 传统方式:同一个Service既处理命令,也处理查询
class UserService:
    def create_user(self, name, email):  # 写
        user = User(name=name, email=email)
        db.save(user)
        return user.id
    
    def get_user(self, user_id):  # 读
        return db.query(user_id)
    
    def list_users(self, page=1):  # 读
        return db.list_with_pagination(page)

# CQRS方式:分离命令和查询
class UserCommandService:
    """只处理写操作"""
    
    def create_user(self, name, email):
        user = User(name=name, email=email)
        event = UserCreatedEvent(user_id=user.id, name=name, email=email)
        
        # 发布事件(不保存到数据库,只记录历史)
        self.event_store.append(event)
        self.event_bus.publish(event)

class UserQueryService:
    """只处理读操作,使用优化的查询模型"""
    
    def get_user(self, user_id):
        # 从专门的查询表读取(已反规范化)
        return self.query_db.user_projection.get(user_id)
    
    def list_users(self, page=1):
        return self.query_db.user_list_projection.paginate(page)

二、Event Sourcing(事件溯源)

2.1 核心概念

不保存对象的当前状态,而是保存所有改变该对象的事件

传统方式:
┌─────────────────┐
│  User表         │
│ id: 1           │
│ name: "Alice"   │
│ email: "a@x.com"│
│ age: 30         │
└─────────────────┘

Event Sourcing方式:
┌──────────────────────────────────────┐
│  Event Log                           │
├──────────────────────────────────────┤
│ 1. UserCreatedEvent:                 │
│    user_id=1, name="Alice"           │
│                                      │
│ 2. UserEmailUpdatedEvent:            │
│    user_id=1, old_email="old@x.com"  │
│    new_email="a@x.com"               │
│                                      │
│ 3. UserAgeUpdatedEvent:              │
│    user_id=1, age=30                 │
│                                      │
│ 4. UserNameUpdatedEvent:             │
│    user_id=1, old_name="Alicia"      │
│    new_name="Alice"                  │
└──────────────────────────────────────┘

当前状态 = 重放所有事件

2.2 优势

1. 完整的审计日志

问题:用户投诉"我从未改过邮箱"
      但系统显示邮箱是abc@example.com

Event Sourcing方案:
  查询Event Log → 找到UserEmailUpdatedEvent
               → 看到修改时间、修改者、IP地址
               → 可以证明是谁在什么时候改的

2. 时间旅行(Time Travel)

# 可以重建任何时刻的对象状态

def get_user_state_at(user_id: str, timestamp: datetime) -> User:
    """
    获取用户在某个时刻的状态
    """
    events = self.event_store.get_events(
        aggregate_id=user_id,
        until=timestamp
    )
    
    user = User()
    for event in events:
        user.apply_event(event)
    
    return user

# 用途:
# 1. 审计:"昨天这个时刻用户的订单是什么状态?"
# 2. 回溯:"2周前用户删除了什么文件?"
# 3. 调试:"在哪个操作之后才出现这个bug?"

3. 自然支持并发

传统方式(乐观锁问题):
  T1: 读User v1 (version=1)
  T2: 读User v1 (version=1)
  
  T1: 修改age,保存 (version=2)
  T2: 修改email,保存 → 冲突!version=1 != 2

Event Sourcing方式:
  T1: 发布UserAgeUpdatedEvent
  T2: 发布UserEmailUpdatedEvent
  
  两个事件都成功追加到Log
  最终状态 = apply(AgeUpdated) + apply(EmailUpdated)
  无冲突!

2.3 实现示例

# domain/event/user_events.py

from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class DomainEvent:
    """所有事件的基类"""
    event_id: str
    aggregate_id: str  # 聚合根ID(如user_id)
    event_type: str
    timestamp: datetime
    data: dict
    
    def to_json(self) -> str:
        return json.dumps({
            'event_id': self.event_id,
            'aggregate_id': self.aggregate_id,
            'event_type': self.event_type,
            'timestamp': self.timestamp.isoformat(),
            'data': self.data
        })

@dataclass
class UserCreatedEvent(DomainEvent):
    """用户创建事件"""
    pass

@dataclass
class UserEmailUpdatedEvent(DomainEvent):
    """用户邮箱更新事件"""
    pass

@dataclass
class UserAgeUpdatedEvent(DomainEvent):
    """用户年龄更新事件"""
    pass

# domain/aggregate/user.py

class User:
    """用户聚合根"""
    
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.name = None
        self.email = None
        self.age = None
        self.version = 0  # 聚合根版本
        self.uncommitted_events = []
    
    def create(self, name: str, email: str):
        """创建用户"""
        event = UserCreatedEvent(
            event_id=self.generate_event_id(),
            aggregate_id=self.user_id,
            event_type='UserCreated',
            timestamp=datetime.now(),
            data={
                'user_id': self.user_id,
                'name': name,
                'email': email
            }
        )
        
        self.apply_event(event)
        self.uncommitted_events.append(event)
    
    def update_email(self, new_email: str):
        """更新邮箱"""
        event = UserEmailUpdatedEvent(
            event_id=self.generate_event_id(),
            aggregate_id=self.user_id,
            event_type='UserEmailUpdated',
            timestamp=datetime.now(),
            data={
                'user_id': self.user_id,
                'old_email': self.email,
                'new_email': new_email
            }
        )
        
        self.apply_event(event)
        self.uncommitted_events.append(event)
    
    def update_age(self, new_age: int):
        """更新年龄"""
        event = UserAgeUpdatedEvent(
            event_id=self.generate_event_id(),
            aggregate_id=self.user_id,
            event_type='UserAgeUpdated',
            timestamp=datetime.now(),
            data={
                'user_id': self.user_id,
                'age': new_age
            }
        )
        
        self.apply_event(event)
        self.uncommitted_events.append(event)
    
    def apply_event(self, event: DomainEvent):
        """应用事件,更新状态"""
        if event.event_type == 'UserCreated':
            self.name = event.data['name']
            self.email = event.data['email']
        
        elif event.event_type == 'UserEmailUpdated':
            self.email = event.data['new_email']
        
        elif event.event_type == 'UserAgeUpdated':
            self.age = event.data['age']
        
        self.version += 1
    
    def get_uncommitted_events(self) -> list:
        """获取未持久化的事件"""
        return self.uncommitted_events
    
    def mark_events_as_committed(self):
        """标记事件为已持久化"""
        self.uncommitted_events = []
    
    @staticmethod
    def generate_event_id() -> str:
        import uuid
        return str(uuid.uuid4())

# infrastructure/event_store.py

class EventStore:
    """事件存储"""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    async def save_events(self, events: list) -> None:
        """
        保存事件
        """
        for event in events:
            await self.db.execute(
                """
                INSERT INTO events (event_id, aggregate_id, event_type, data, timestamp)
                VALUES (%s, %s, %s, %s, %s)
                """,
                event.event_id,
                event.aggregate_id,
                event.event_type,
                event.to_json(),
                event.timestamp
            )
    
    async def get_events(self, aggregate_id: str) -> list:
        """
        获取某个聚合根的所有事件
        """
        rows = await self.db.fetch(
            "SELECT data FROM events WHERE aggregate_id = %s ORDER BY timestamp",
            aggregate_id
        )
        
        events = []
        for row in rows:
            event_data = json.loads(row['data'])
            event_type = event_data['event_type']
            
            if event_type == 'UserCreated':
                event = UserCreatedEvent(**event_data)
            elif event_type == 'UserEmailUpdated':
                event = UserEmailUpdatedEvent(**event_data)
            elif event_type == 'UserAgeUpdated':
                event = UserAgeUpdatedEvent(**event_data)
            
            events.append(event)
        
        return events
    
    async def rebuild_aggregate(self, aggregate_id: str) -> User:
        """
        重建聚合根(重放所有事件)
        """
        user = User(aggregate_id)
        events = await self.get_events(aggregate_id)
        
        for event in events:
            user.apply_event(event)
        
        return user

# application/service/user_command_service.py

class UserCommandService:
    """用户命令服务(写操作)"""
    
    def __init__(self, event_store: EventStore, event_bus):
        self.event_store = event_store
        self.event_bus = event_bus
    
    async def create_user(self, user_id: str, name: str, email: str):
        """创建用户"""
        user = User(user_id)
        user.create(name, email)
        
        # 保存事件
        await self.event_store.save_events(user.get_uncommitted_events())
        
        # 发布事件(触发事件处理器,更新读模型)
        for event in user.get_uncommitted_events():
            await self.event_bus.publish(event)
        
        user.mark_events_as_committed()
    
    async def update_email(self, user_id: str, new_email: str):
        """更新邮箱"""
        # 重建用户状态
        user = await self.event_store.rebuild_aggregate(user_id)
        
        # 执行命令
        user.update_email(new_email)
        
        # 保存新事件
        await self.event_store.save_events(user.get_uncommitted_events())
        
        # 发布事件
        for event in user.get_uncommitted_events():
            await self.event_bus.publish(event)
        
        user.mark_events_as_committed()

# projection/user_query_projection.py

class UserQueryProjection:
    """用户查询投影(读模型)"""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    async def handle_user_created(self, event: UserCreatedEvent):
        """处理UserCreated事件,更新读模型"""
        await self.db.execute(
            """
            INSERT INTO user_query (user_id, name, email, created_at)
            VALUES (%s, %s, %s, %s)
            """,
            event.data['user_id'],
            event.data['name'],
            event.data['email'],
            event.timestamp
        )
    
    async def handle_user_email_updated(self, event: UserEmailUpdatedEvent):
        """处理UserEmailUpdated事件"""
        await self.db.execute(
            "UPDATE user_query SET email = %s WHERE user_id = %s",
            event.data['new_email'],
            event.data['user_id']
        )
    
    async def handle_user_age_updated(self, event: UserAgeUpdatedEvent):
        """处理UserAgeUpdated事件"""
        await self.db.execute(
            "UPDATE user_query SET age = %s WHERE user_id = %s",
            event.data['age'],
            event.data['user_id']
        )
    
    async def get_user(self, user_id: str):
        """查询用户(从读模型)"""
        return await self.db.fetchrow(
            "SELECT * FROM user_query WHERE user_id = %s",
            user_id
        )

三、CQRS+Event Sourcing架构

3.1 完整架构

写端:
┌─────────────────┐
│  User Command   │
│  (HTTP POST)    │
└────────┬────────┘
         │
    ┌────▼──────────┐
    │ Command Handler│
    │                │
    │ 1. 重建聚合根  │
    │ 2. 执行命令    │
    │ 3. 发布事件    │
    └────┬──────────┘
         │
    ┌────▼────────────┐
    │  Event Store    │
    │  (数据库Log表)   │
    └────┬────────────┘
         │
    ┌────▼────────────┐
    │  Event Bus      │
    │  (异步消费)     │
    └────┬────────────┘
         │
读端:   │
    ┌────▼────────────────┐
    │ Event Handlers      │
    │ (投影更新器)        │
    └────┬─────────────────┘
         │
    ┌────▼────────────┐
    │ Query Models    │
    │ (优化的读表)    │
    └────┬────────────┘
         │
    ┌────▼───────────┐
    │ Query Service   │
    │ (HTTP GET)     │
    └────────────────┘

3.2 Event Handlers(投影更新器)

class EventHandlers:
    """订阅事件,更新读模型"""
    
    def __init__(self, user_projection: UserQueryProjection):
        self.user_projection = user_projection
    
    async def on_user_created(self, event: UserCreatedEvent):
        """当UserCreated事件发布时调用"""
        await self.user_projection.handle_user_created(event)
    
    async def on_user_email_updated(self, event: UserEmailUpdatedEvent):
        """当UserEmailUpdated事件发布时调用"""
        await self.user_projection.handle_user_email_updated(event)

# 注册Event Handler
event_bus.subscribe('UserCreated', handler.on_user_created)
event_bus.subscribe('UserEmailUpdated', handler.on_user_email_updated)

四、CQRS vs Event Sourcing

两者经常一起用,但不必非要配对:

┌────────────────────────────────────────┐
│  独立使用CQRS(不使用ES)              │
│  ├─ 写:直接修改Command模型            │
│  └─ 读:从Query模型读取                │
│  优点:简单,性能好                    │
│  缺点:无法审计、难以时间旅行          │
│                                       │
├────────────────────────────────────────┤
│  独立使用Event Sourcing(不使用CQRS)  │
│  ├─ 保存所有事件                       │
│  └─ 重放事件获取当前状态               │
│  优点:完整审计、时间旅行              │
│  缺点:重放性能低                      │
│                                       │
├────────────────────────────────────────┤
│  同时使用CQRS + Event Sourcing        │
│  ├─ 写端:Event Sourcing(可靠记录)  │
│  └─ 读端:Query模型(性能优化)       │
│  优点:两者优点结合                    │
│  缺点:复杂度高,运维难度大            │
└────────────────────────────────────────┘

五、常见问题与解决方案

5.1 问题1:最终一致性延迟

用户发起转账:
T1: 发布TransferInitiated事件
T2: 读模型还未更新,查询余额仍是旧值
T3: 用户看到:转账成功,但余额未扣减

解决:
1. 在写端返回,同时返回新状态(不查读模型)
2. 轮询重试,确保读模型已更新
3. WebSocket推送最新状态

5.2 问题2:Event Store膨胀

情景:同一个聚合根有10000个事件
      每次都要重放10000个事件?性能极低!

解决方案:快照(Snapshot)

每隔N个事件创建快照:
  Event 1-1000 → Snapshot v1
  Event 1001-2000 → Snapshot v2
  ...
  Event 9001-10000 → Snapshot v10

重建时:
  加载最新快照(版本v9)
  只重放最后1000个事件
  大大加速!

5.3 问题3:读写强一致性需求

某些场景不能接受最终一致性:
  └─ 转账:必须立即扣款和加款(强一致)

解决方案:混合方案
  转账这类关键操作:使用传统ACID事务
  其他操作:使用CQRS+Event Sourcing

总结

特性传统CRUDCQRSESCQRS+ES
实现复杂度
性能
审计
时间旅行
最终一致N/A

何时使用

  • ✓ CQRS: 读写负载差异大(如报表系统)
  • ✓ Event Sourcing: 需要完整审计、法规要求
  • ✓ CQRS+ES: 复杂业务、超大规模系统(如金融)
上次编辑于:
贡献者: zhengtianqi