跳至主要內容

六边形架构与整洁架构:从理论到实战落地

郑天祺大约 8 分钟架构设计六边形架构整洁架构

前言

大多数互联网应用,代码演进轨迹是这样的:

第一年:代码清晰
第二年:开始混乱(业务逻辑和框架代码纠缠)
第三年:完全烂尾(改个功能需要改10个文件,改错概率50%)

根本原因是:业务逻辑和基础设施代码没有边界

本文通过六边形架构整洁架构两个经典模式,展示如何设计出易于维护、易于测试、易于扩展的代码结构。


一、问题:传统MVC架构的困境

1.1 经典MVC结构

┌─────────────────────────┐
│      Controller         │
│ (HTTP请求处理)          │
└────────────┬────────────┘
             │
┌────────────▼────────────┐
│       Service           │
│ (业务逻辑)              │
└────────────┬────────────┘
             │
┌────────────▼────────────┐
│      Repository         │
│ (数据库访问)            │
└─────────────────────────┘

问题

1. 业务逻辑混杂在各层
   Service层既有业务规则,也有数据库查询逻辑
   
2. 依赖关系混乱
   Service依赖Repository(数据库)
   Controller依赖Service
   上层依赖下层(正常)
   但下层不应该知道上层的存在

3. 难以测试
   Service中有数据库操作 → 必须Mock数据库
   控制器中有业务逻辑 → 必须启动Web框架
   
4. 难以扩展
   支持新的输入方式(如RPC) → 复制Controller代码
   支持新的输出方式(如微服务) → 修改Service代码

二、六边形架构(Hexagonal Architecture)

2.1 核心思想

Alistair Cockburn 提出的六边形架构,其核心理念是:

业务逻辑 ← 中心(不知道外界如何与它通信)
      ↑
      │ 通过接口
      │ 适配器转换
      │
外界(HTTP、数据库、消息队列等)

六个方向代表可能的输入/输出:

        ┌──────────┐
       ╱            ╲
      ╱  Business   ╲
     │    Logic      │
      ╲    (Core)   ╱
       ╲            ╱
        └──────────┘
        /  |  |  \  \  \
  HTTP  DB Msg Event Cache ...

2.2 架构分层

┌────────────────────────────────────┐
│        外部(Infrastructure)      │
│  (Web框架、数据库、消息队列)       │
└──────────────┬─────────────────────┘
               │
        ┌──────▼──────┐
        │   适配器     │
        │  (Adapter)   │ ← 转换格式
        │              │  (业务模型 ↔ 外界格式)
        └──────┬───────┘
               │
        ┌──────▼──────────┐
        │   应用服务      │
        │(Application)    │ ← 用例编排
        └──────┬──────────┘
               │
        ┌──────▼──────────────┐
        │   领域模型           │
        │  (Domain Model)     │ ← 业务规则(纯业务逻辑)
        │                     │  不知道数据库、HTTP等
        └─────────────────────┘

2.3 代码示例

第1层:领域模型(最内层,零依赖)

# domain/model/order.py

class Order:
    """订单(纯业务逻辑,不知道任何框架)"""
    
    def __init__(self, order_id: str, customer_id: str, items: list, total_price: float):
        self.order_id = order_id
        self.customer_id = customer_id
        self.items = items  # [{"product_id": "p1", "qty": 2, "price": 100}, ...]
        self.total_price = total_price
        self.status = "PENDING"
        self.created_at = datetime.now()
        self.paid_at = None
    
    def can_pay(self) -> bool:
        """业务规则:订单能否支付?"""
        return self.status == "PENDING" and self.total_price > 0
    
    def mark_as_paid(self, paid_at: datetime):
        """业务规则:标记为已支付"""
        if not self.can_pay():
            raise ValueError("Order cannot be paid in current status")
        
        self.status = "PAID"
        self.paid_at = paid_at
    
    def can_cancel(self) -> bool:
        """业务规则:订单能否取消?"""
        return self.status in ["PENDING", "PAID"] and self.paid_at is None
    
    def cancel(self):
        """业务规则:取消订单"""
        if not self.can_cancel():
            raise ValueError("Order cannot be cancelled")
        
        self.status = "CANCELLED"
    
    def calculate_discount(self, customer_level: str) -> float:
        """业务规则:计算折扣"""
        discounts = {
            "VIP": 0.2,      # 20%折扣
            "GOLD": 0.1,     # 10%折扣
            "SILVER": 0.05,  # 5%折扣
            "NORMAL": 0,
        }
        return self.total_price * discounts.get(customer_level, 0)

# domain/repository/order_repository.py

from abc import ABC, abstractmethod

class OrderRepository(ABC):
    """订单仓储接口(业务层定义,基础设施层实现)"""
    
    @abstractmethod
    async def save(self, order: Order) -> None:
        """保存订单"""
        pass
    
    @abstractmethod
    async def get_by_id(self, order_id: str) -> Order:
        """获取订单"""
        pass
    
    @abstractmethod
    async def list_by_customer(self, customer_id: str, limit: int = 10) -> list[Order]:
        """获取客户的订单列表"""
        pass

关键特性

  • ✓ 零依赖(不引入任何框架)
  • ✓ 业务规则集中
  • ✓ 易于单元测试

第2层:应用服务(用例编排)

# application/service/order_service.py

from domain.model.order import Order
from domain.repository.order_repository import OrderRepository

class OrderApplicationService:
    """订单应用服务(协调用例)"""
    
    def __init__(self, order_repo: OrderRepository, payment_service):
        """通过构造函数注入依赖"""
        self.order_repo = order_repo
        self.payment_service = payment_service
    
    async def create_order(self, command: CreateOrderCommand) -> str:
        """
        用例:创建订单
        
        command: {
            "customer_id": "cust123",
            "items": [{"product_id": "prod1", "qty": 2, "price": 100}],
            "total_price": 200
        }
        """
        # 1. 创建领域对象
        order = Order(
            order_id=self.generate_order_id(),
            customer_id=command.customer_id,
            items=command.items,
            total_price=command.total_price
        )
        
        # 2. 保存到仓储
        await self.order_repo.save(order)
        
        # 3. 返回结果
        return order.order_id
    
    async def pay_order(self, command: PayOrderCommand) -> bool:
        """
        用例:支付订单
        """
        # 1. 获取订单
        order = await self.order_repo.get_by_id(command.order_id)
        
        # 2. 检查业务规则
        if not order.can_pay():
            raise ValueError("Order cannot be paid")
        
        # 3. 调用支付服务
        payment_result = await self.payment_service.process_payment(
            order_id=order.order_id,
            amount=order.total_price
        )
        
        if not payment_result.success:
            return False
        
        # 4. 更新订单状态(业务逻辑)
        order.mark_as_paid(datetime.now())
        
        # 5. 持久化
        await self.order_repo.save(order)
        
        return True
    
    async def cancel_order(self, command: CancelOrderCommand) -> None:
        """用例:取消订单"""
        order = await self.order_repo.get_by_id(command.order_id)
        
        if not order.can_cancel():
            raise ValueError("Order cannot be cancelled")
        
        order.cancel()
        await self.order_repo.save(order)
    
    @staticmethod
    def generate_order_id() -> str:
        import uuid
        return f"ORD-{uuid.uuid4().hex[:8].upper()}"

第3层:适配器(转换格式)

# adapter/web/order_controller.py

from fastapi import FastAPI, HTTPException
from application.service.order_service import OrderApplicationService
from application.command import CreateOrderCommand, PayOrderCommand

app = FastAPI()

class OrderController:
    """HTTP适配器:FastAPI → 应用服务"""
    
    def __init__(self, order_service: OrderApplicationService):
        self.order_service = order_service
    
    @app.post("/orders")
    async def create_order(self, request: dict):
        """
        HTTP请求 → 应用服务
        
        将HTTP请求格式转换为应用服务能理解的格式
        """
        try:
            # HTTP格式 → Command格式
            command = CreateOrderCommand(
                customer_id=request['customer_id'],
                items=request['items'],
                total_price=request['total_price']
            )
            
            # 调用应用服务
            order_id = await self.order_service.create_order(command)
            
            # Command结果 → HTTP响应
            return {
                "success": True,
                "order_id": order_id,
                "status": "PENDING"
            }
        
        except ValueError as e:
            raise HTTPException(status_code=400, detail=str(e))
    
    @app.post("/orders/{order_id}/pay")
    async def pay_order(self, order_id: str, request: dict):
        """支付订单"""
        try:
            command = PayOrderCommand(
                order_id=order_id,
                payment_method=request['payment_method']
            )
            
            success = await self.order_service.pay_order(command)
            
            return {
                "success": success,
                "order_id": order_id,
                "status": "PAID" if success else "PAYMENT_FAILED"
            }
        except ValueError as e:
            raise HTTPException(status_code=400, detail=str(e))

# adapter/persistence/order_repository_impl.py

from domain.repository.order_repository import OrderRepository
from domain.model.order import Order
import json

class OrderRepositoryImpl(OrderRepository):
    """数据库适配器:PostgreSQL → 领域模型"""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    async def save(self, order: Order) -> None:
        """
        领域模型 → 数据库
        """
        query = """
            INSERT INTO orders (order_id, customer_id, items, total_price, status, created_at, paid_at)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (order_id) DO UPDATE SET
                status = EXCLUDED.status,
                paid_at = EXCLUDED.paid_at
        """
        
        await self.db.execute(
            query,
            order.order_id,
            order.customer_id,
            json.dumps(order.items),  # 序列化items
            order.total_price,
            order.status,
            order.created_at,
            order.paid_at
        )
    
    async def get_by_id(self, order_id: str) -> Order:
        """
        数据库 → 领域模型
        """
        query = "SELECT * FROM orders WHERE order_id = %s"
        row = await self.db.fetchrow(query, order_id)
        
        if not row:
            raise ValueError(f"Order {order_id} not found")
        
        # 数据库行 → 领域对象
        return Order(
            order_id=row['order_id'],
            customer_id=row['customer_id'],
            items=json.loads(row['items']),
            total_price=row['total_price']
        )

2.4 六边形架构的优势

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ HTTP API     │     │ gRPC Server  │     │ Event Handler│
└──────┬───────┘     └──────┬───────┘     └──────┬───────┘
       │                    │                    │
       └────────────────────┼────────────────────┘
                            │ (都使用同一个应用服务)
                     ┌──────▼───────┐
                     │ OrderAppSvc  │
                     └──────┬───────┘
                            │
              ┌─────────────┼─────────────┐
              │             │             │
       ┌──────▼────┐  ┌────▼──────┐  ┌──▼───────┐
       │ PostgreSQL│  │ Kafka     │  │ Redis    │
       │ Adapter   │  │ Adapter   │  │ Adapter  │
       └───────────┘  └───────────┘  └──────────┘

优势:
  1. 同一个业务逻辑,支持多个输入端 (HTTP、gRPC、Event)
  2. 同一个业务逻辑,支持多个数据源 (DB、Cache、MQ)
  3. 业务逻辑与框架完全解耦
  4. 易于测试:只需Mock接口,不需要真实数据库

三、整洁架构(Clean Architecture)

3.1 与六边形架构的关系

整洁架构是六边形架构的更详细说明

┌─────────────────────────────────────────┐
│   Frameworks & Drivers (Web, DB)        │
│   (最外层,最易变)                      │
└──────────────┬────────────────────────┘
               │
┌──────────────▼────────────────────────┐
│   Interface Adapters                   │
│   (控制器、网关、管理者)               │
└──────────────┬────────────────────────┘
               │
┌──────────────▼────────────────────────┐
│   Application Business Rules           │
│   (用例、应用服务)                    │
└──────────────┬────────────────────────┘
               │
┌──────────────▼────────────────────────┐
│   Enterprise Business Rules            │
│   (实体、核心业务逻辑)                 │
└─────────────────────────────────────────┘
     (最内层,改变最少,最高层次)

3.2 四层详解

第1层:实体层(Entities)

纯业务对象,不依赖任何框架

class User:
    def __init__(self, id: str, email: str, password_hash: str):
        self.id = id
        self.email = email
        self.password_hash = password_hash
    
    def verify_password(self, password: str) -> bool:
        """业务规则:验证密码"""
        import bcrypt
        return bcrypt.checkpw(password.encode(), self.password_hash.encode())
    
    def is_valid_email(self) -> bool:
        """业务规则:邮箱是否有效"""
        import re
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return re.match(pattern, self.email) is not None

第2层:用例层(Use Cases)

编排实体和服务

class RegisterUserUseCase:
    def __init__(self, user_repo: UserRepository, email_service: EmailService):
        self.user_repo = user_repo
        self.email_service = email_service
    
    async def execute(self, request: RegisterRequest) -> RegisterResponse:
        """
        用例:注册用户
        
        1. 检查邮箱是否已被注册
        2. 创建用户
        3. 发送验证邮件
        """
        # 检查邮箱重复
        existing_user = await self.user_repo.get_by_email(request.email)
        if existing_user:
            raise UserAlreadyExistsError()
        
        # 创建用户
        user = User(
            id=self.generate_user_id(),
            email=request.email,
            password_hash=self.hash_password(request.password)
        )
        
        # 保存
        await self.user_repo.save(user)
        
        # 发送邮件
        await self.email_service.send_verification_email(user.email)
        
        return RegisterResponse(user_id=user.id, message="Please verify your email")

第3层:接口适配器(Interface Adapters)

HTTP、数据库、消息队列的适配器

# HTTP适配器
class UserController:
    def __init__(self, register_use_case: RegisterUserUseCase):
        self.register_use_case = register_use_case
    
    @app.post("/register")
    async def register(self, request: dict):
        try:
            register_request = RegisterRequest(
                email=request['email'],
                password=request['password']
            )
            response = await self.register_use_case.execute(register_request)
            return {"success": True, "user_id": response.user_id}
        except UserAlreadyExistsError:
            raise HTTPException(status_code=400, detail="Email already registered")

# 数据库适配器
class UserRepositoryPostgres(UserRepository):
    async def save(self, user: User) -> None:
        await self.db.execute(
            "INSERT INTO users (id, email, password_hash) VALUES (%s, %s, %s)",
            user.id, user.email, user.password_hash
        )
    
    async def get_by_email(self, email: str) -> User:
        row = await self.db.fetchrow(
            "SELECT * FROM users WHERE email = %s",
            email
        )
        if row:
            return User(row['id'], row['email'], row['password_hash'])
        return None

第4层:框架和驱动(Frameworks & Drivers)

Web框架、数据库库等

# main.py - 应用启动
from fastapi import FastAPI
import asyncpg

app = FastAPI()

# 依赖注入
@app.on_event("startup")
async def startup():
    db = await asyncpg.create_pool("postgresql://user:pass@localhost/mydb")
    
    user_repo = UserRepositoryPostgres(db)
    email_service = EmailServiceImpl()
    register_use_case = RegisterUserUseCase(user_repo, email_service)
    
    user_controller = UserController(register_use_case)
    app.include_router(user_controller.router)

四、依赖规则

整洁架构的金律

┌────────────────┐
│  最内层(实体)   │  不能依赖任何外层
└────────────────┘
        ↑ 只能被依赖,不能依赖
┌────────────────┐
│  用例层        │  不能依赖接口适配器或框架
└────────────────┘
        ↑
┌────────────────┐
│  接口适配器    │  不能依赖框架(但通常会)
└────────────────┘
        ↑
┌────────────────┐
│  框架         │  最外层,没有约束
└────────────────┘

违反依赖规则的例子

# ❌ 错误:用例依赖HTTP框架
class RegisterUseCase:
    def execute(self, request):
        from fastapi import HTTPException  # 错误!
        raise HTTPException(status_code=400)

# ✓ 正确:用例抛出自定义异常,由适配器转换
class RegisterUseCase:
    def execute(self, request):
        raise UserAlreadyExistsError()  # 自定义业务异常

class UserController:
    @app.post("/register")
    async def register(self, request):
        try:
            self.use_case.execute(...)
        except UserAlreadyExistsError:
            raise HTTPException(status_code=400)  # 转换为HTTP异常

五、实战:从传统MVC到整洁架构迁移

5.1 迁移步骤

第一步:分离业务逻辑

原:Service混合业务逻辑和数据访问
新:分离为 UseCase(业务逻辑) + Repository(数据访问)

第二步:定义接口

原:Service直接依赖具体的DatabaseConnection
新:Service依赖Repository接口,具体实现在adapter层

第三步:重构依赖注入

原:
  controller = OrderController()  # 硬编码依赖
  controller.service = OrderService()

新:
  order_repo = OrderRepositoryPostgres(db_connection)
  order_service = OrderApplicationService(order_repo, payment_service)
  order_controller = OrderController(order_service)

5.2 测试对比

# 整洁架构前:难以测试
class OrderService:
    def __init__(self):
        self.db = PostgreSQLConnection()  # 硬编码依赖
    
    def create_order(self, data):
        # 需要真实数据库或复杂的Mock
        self.db.insert("orders", data)

# 整洁架构后:易于测试
class OrderUseCase:
    def __init__(self, repo: OrderRepository):
        self.repo = repo
    
    async def create_order(self, data):
        await self.repo.save(data)

# 单元测试
class MockRepository(OrderRepository):
    async def save(self, order):
        self.saved_orders = [order]

use_case = OrderUseCase(MockRepository())
await use_case.create_order(order_data)
# 无需真实数据库,快速执行

总结

特性传统MVC六边形整洁架构
业务逻辑位置散布在各层内核用例层
依赖方向混乱单向→内核单向→内核
可测试性
扩展性
学习成本中高

何时应用

  • ✓ 中大型应用(>10K行代码)
  • ✓ 业务规则复杂
  • ✓ 需要支持多种输入/输出端
  • ✗ 简单CRUD应用(过度设计)

黄金法则

  1. 业务逻辑≠技术细节
  2. 依赖指向内层
  3. 定义明确的接口
  4. 每层单一职责

上次编辑于:
贡献者: zhengtianqi