跳至主要內容

RocketMQ 金融场景实战:担保系统的消息可靠性设计

郑天祺大约 10 分钟消息队列RocketMQ金融事务消息顺序消息延迟消息

RocketMQ 金融场景实战:担保系统的消息可靠性设计

前言

金融系统对消息中间件有三个核心要求:不能丢、不能乱、能回溯。RocketMQ 正是为满足这些要求而设计的。本文以担保业务为实战案例,讲解如何用 RocketMQ 构建可靠的消息系统。


第一部分:为什么金融场景选 RocketMQ

1.1 RocketMQ vs Kafka 关键对比

维度RocketMQKafka金融选型考量
事务消息原生支持0.11+支持但复杂★★★ RocketMQ 胜
顺序消息全局+分区顺序仅分区内顺序★★ RocketMQ 胜
延迟消息18个内置级别不原生支持★★ RocketMQ 胜
消息回溯按时间秒级回溯支持但运维复杂★ RocketMQ 胜
堆积能力亿级消息堆积依赖磁盘容量持平
社区背景阿里/蚂蚁金融场景大数据/日志★ RocketMQ 更匹配

结论:金融交易场景(担保、支付、账务)→ RocketMQ;日志和大数据场景 → Kafka


第二部分:RocketMQ 核心架构

2.1 四大组件

┌───────────────────────────────────────────────┐
│                  NameServer                    │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐       │
│  │   NS1   │  │   NS2   │  │   NS3   │       │
│  │(无状态) │  │(无状态) │  │(无状态) │       │
│  └─────────┘  └─────────┘  └─────────┘       │
│  注册中心:Broker 注册、Topic 路由发现         │
└───────────────────────────────────────────────┘
          ▲            ▲            ▲
          │ 注册/心跳  │            │
┌─────────┼────────────┼────────────┼───────────┐
│         ▼            ▼            ▼           │
│              Broker Cluster                   │
│  ┌──────────────┐ ┌──────────────┐           │
│  │  Master-1    │ │  Master-2    │           │
│  │  ┌────────┐  │ │  ┌────────┐  │           │
│  │  │Slave-1 │  │ │  │Slave-2 │  │           │
│  │  └────────┘  │ │  └────────┘  │           │
│  └──────────────┘ └──────────────┘           │
│  消息存储、分发、高可用保障                    │
└───────────────────────────────────────────────┘
          ▲                       ▲
          │ 发送                  │ 拉取
┌─────────┴─────────┐  ┌─────────┴─────────┐
│     Producer      │  │     Consumer      │
│  - 同步/异步发送  │  │  - 推/拉模式      │
│  - 事务消息       │  │  - 集群/广播消费   │
│  - 顺序消息       │  │  - 顺序消费       │
└───────────────────┘  └───────────────────┘

NameServer 的核心特点

  • 无状态:不保存 Topic 的持久数据,从 Broker 获取
  • 最终一致:Broker 定期心跳上报,NameServer 之间不通信
  • 简单可靠:对比 Kafka 的 ZooKeeper,运维成本大幅降低

第三部分:消息可靠性

3.1 同步刷盘 vs 异步刷盘

// Broker 配置
// 同步刷盘:消息写入磁盘后才返回成功(最可靠,但性能最低)
flushDiskType = SYNC_FLUSH

// 异步刷盘:消息写入 PageCache 就返回(性能高,但断电可能丢消息)
flushDiskType = ASYNC_FLUSH

金融场景推荐:同步刷盘 + 主从复制

# Broker 配置
flushDiskType=SYNC_FLUSH
brokerRole=SYNC_MASTER

3.2 主从复制

同步复制 (SYNC_MASTER):
  Producer → Master ──→ Slave (同步写入)
              │           │
              ←── ack ────┘
  → 等待 Slave 确认后才返回成功
  → 可靠性最高,延迟稍高

异步复制 (ASYNC_MASTER):
  Producer → Master (立刻返回)
              │
              └──→ Slave (异步写入)
  → 性能最高,但主宕机可能丢消息
// 生产者发送消息 - 金融级可靠性配置
@Service
public class GuaranteeMessagePublisher {

    @Autowired private RocketMQTemplate rocketMQTemplate;

    public SendResult sendGuaranteeEvent(GuaranteeEvent event) {
        // 构建消息
        Message<String> message = MessageBuilder
            .withPayload(JSON.toJSONString(event))
            .setHeader(RocketMQHeaders.KEYS, event.getGuaranteeId())  // 业务 Key
            .setHeader("eventType", event.getEventType())
            .build();

        // 同步发送 + 超时重试
        return rocketMQTemplate.syncSend(
            "guarantee-topic",       // Topic
            message,                  // 消息
            3000,                     // 超时 3 秒
            3                         // 重试 3 次
        );
    }
}

第四部分:事务消息——分布式事务的可靠实现

4.1 事务消息的执行流程

RocketMQ 事务消息是解决分布式事务的核心武器:

Producer                   Broker                    Consumer
  │                          │                          │
  │  1. 发送半消息            │                          │
  │ ────────────────────────→│                          │
  │                          │ (存半消息,不可见)        │
  │  2. 执行本地事务          │                          │
  │  (如:创建担保订单)       │                          │
  │                          │                          │
  │  3. 提交/回滚             │                          │
  │ ────────────────────────→│                          │
  │                          │                          │
  │  若超时未收到确认:        │                          │
  │  4. 回查事务状态           │                          │
  │ ←────────────────────────│                          │
  │  5. 返回事务状态           │                          │
  │ ────────────────────────→│                          │
  │                          │                          │
  │                          │  6. 投递消息(消费者可见)│
  │                          │ ────────────────────────→│

4.2 担保放款流程的事务消息实现

/**
 * 担保放款服务 - 使用事务消息保证流程一致性
 * 
 * 流程:担保确认 → 额度扣减 → 放款 → 通知
 * 使用 RocketMQ 事务消息保证前三步的最终一致性
 */
@Service
public class GuaranteeDisburseService {

    @Autowired private GuaranteeRepository guaranteeRepository;
    @Autowired private CreditLineRepository creditLineRepository;
    @Autowired private RocketMQTemplate rocketMQTemplate;

    /**
     * 担保确认放款 - 事务消息模式
     */
    @Transactional  // 数据库事务
    public DisburseResult confirmDisburse(DisburseCommand command) {
        // Step 1: 加载担保申请(聚合)
        GuaranteeApplication guarantee = guaranteeRepository
            .findById(new GuaranteeId(command.getGuaranteeId()))
            .orElseThrow(() -> new BusinessException("担保申请不存在"));

        // Step 2: 确认放款(聚合内部保证业务不变量)
        guarantee.confirmDisburse(
            Money.rmb(command.getAmount()),
            command.getLoanAccount()
        );

        // Step 3: 保存担保状态
        guaranteeRepository.save(guarantee);

        // Step 4: 构建事务消息
        DisburseConfirmedEvent event = DisburseConfirmedEvent.builder()
            .guaranteeId(guarantee.getId().toString())
            .customerId(guarantee.getCustomerId().toString())
            .amount(command.getAmount())
            .loanAccount(command.getLoanAccount())
            .build();

        Message<String> message = MessageBuilder
            .withPayload(JSON.toJSONString(event))
            .setHeader(RocketMQHeaders.TRANSACTION_ID,
                guarantee.getId().toString())  // 事务 ID
            .build();

        // Step 5: 发送事务消息
        // 只有本地事务提交成功后,消息才会投递
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "guarantee-disburse-topic",
            message,
            guarantee.getId()  // 传递给事务监听器的参数
        );

        if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
            return DisburseResult.success(guarantee.getStatus());
        } else {
            throw new BusinessException("放款确认失败");
        }
    }
}

/**
 * 事务消息监听器:处理回查
 */
@RocketMQTransactionListener
public class DisburseTransactionListener
        implements RocketMQLocalTransactionListener {

    @Autowired private GuaranteeRepository guaranteeRepository;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(
            Message msg, Object arg) {
        // 本地事务已在 DisburseService 中提交
        // 这里只需要确认结果
        try {
            Long guaranteeId = (Long) arg;
            GuaranteeApplication guarantee = guaranteeRepository
                .findById(new GuaranteeId(guaranteeId)).orElse(null);

            if (guarantee != null && guarantee.isDisbursed()) {
                return RocketMQLocalTransactionState.COMMIT;
            }
            return RocketMQLocalTransactionState.ROLLBACK;
        } catch (Exception e) {
            log.error("事务消息执行异常", e);
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(
            Message msg) {
        // Broker 回查:确认事务最终状态
        String transactionId = msg.getTransactionId();
        Long guaranteeId = extractGuaranteeId(msg);

        GuaranteeApplication guarantee = guaranteeRepository
            .findById(new GuaranteeId(guaranteeId)).orElse(null);

        if (guarantee == null) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        if (guarantee.isDisbursed()) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}

/**
 * 消费者:额度扣减服务
 */
@Service
@RocketMQMessageListener(
    topic = "guarantee-disburse-topic",
    consumerGroup = "credit-deduct-group"
)
public class CreditDeductConsumer
        implements RocketMQListener<DisburseConfirmedEvent> {

    @Autowired private CreditLineRepository creditLineRepository;
    @Autowired private CreditDeductLogRepository deductLogRepository;

    @Override
    @Transactional
    public void onMessage(DisburseConfirmedEvent event) {
        // 幂等性检查
        String msgId = MessageClient.getMessageId();
        if (deductLogRepository.existsByMsgId(msgId)) {
            log.info("消息已处理,跳过: msgId={}", msgId);
            return;
        }

        try {
            // 扣减额度
            CreditLine creditLine = creditLineRepository
                .findActiveByCustomerId(new CustomerId(event.getCustomerId()))
                .orElseThrow(() -> new BusinessException("授信不存在"));

            creditLine.deductUsed(Money.rmb(event.getAmount()));
            creditLineRepository.save(creditLine);

            // 记录处理日志
            deductLogRepository.save(new CreditDeductLog(
                msgId, event.getGuaranteeId(), event.getAmount()));

        } catch (Exception e) {
            log.error("额度扣减失败: {}", event, e);
            // RocketMQ 会自动重试,不需要手动处理
            throw e;
        }
    }
}

第五部分:顺序消息

5.1 什么是顺序消息

全局顺序消息(一个 Topic 一个队列):
  [msg1] → [msg2] → [msg3] → [msg4]
  消费者严格按照写入顺序消费
  但:并行度 = 1,吞吐量低

分区顺序消息(多个队列,同一 Key 在同一队列):
  Queue 0: [A-1] → [A-2] → [A-3]   (Key=A)
  Queue 1: [B-1] → [B-2]           (Key=B)
  Queue 2: [C-1] → [C-2] → [C-3]   (Key=C)
  同一 Key 的消息有序,不同 Key 之间无序
  但:每个 Key 独占一个线程,吞吐量受限

5.2 担保状态变更的顺序消息

担保订单的状态变更必须严格按顺序处理:APPROVED → SIGNED → ACTIVE → CLOSED

/**
 * 发送端:按 guaranteeId 路由,保证同一担保的顺序
 */
@Service
public class GuaranteeStatusPublisher {

    @Autowired private RocketMQTemplate rocketMQTemplate;

    public void publishStatusChange(GuaranteeApplication guarantee,
                                    GuaranteeStatusChangeEvent event) {
        // 使用 SelectMessageQueueByHash 策略
        // 同一个 guaranteeId 的消息发到同一个队列
        rocketMQTemplate.syncSendOrderly(
            "guarantee-status-topic",
            MessageBuilder.withPayload(event).build(),
            guarantee.getId().toString()  // Hash Key → 同一队列
        );
    }
}

/**
 * 消费端:顺序消费
 */
@Service
@RocketMQMessageListener(
    topic = "guarantee-status-topic",
    consumerGroup = "guarantee-status-group",
    consumeMode = ConsumeMode.ORDERLY  // ← 顺序消费模式
)
public class GuaranteeStatusConsumer
        implements RocketMQListener<GuaranteeStatusChangeEvent> {

    // 每个 MessageQueue 分配一个线程消费,保证顺序
    @Override
    public void onMessage(GuaranteeStatusChangeEvent event) {
        // 处理状态变更
        // 对于同一个 guaranteeId,APPROVED 一定在 SIGNED 之前处理
        statusChangeHandler.handle(event);
    }
}

注意:顺序消费模式下,如果一个消息处理失败,后续消息会被阻塞,直到该消息被成功消费或达到重试上限。


第六部分:延迟消息

6.1 延迟消息的金融场景

RocketMQ 支持 18 个预定义的延迟级别:

1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h

担保业务中的典型场景

场景延迟说明
放款超时检查30 分钟放款申请后 30 分钟未到账,自动取消
签约超时提醒1 小时合同发后 1 小时未签,发送催签短信
还款到期前提醒到期前 3 天提前 3 天发送还款提醒
保后定期检查每日每日检查担保状态是否需要更新
@Service
public class DelayedGuaranteeService {

    @Autowired private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送延迟消息:放款 30 分钟后检查是否到账
     */
    public void scheduleDisburseCheck(GuaranteeApplication guarantee) {
        DisburseCheckMessage checkMsg = DisburseCheckMessage.builder()
            .guaranteeId(guarantee.getId().toString())
            .expectedAmount(guarantee.getGuaranteeAmount().getAmount())
            .buildTime(Instant.now())
            .build();

        Message<String> message = MessageBuilder
            .withPayload(JSON.toJSONString(checkMsg))
            .build();

        // 延迟级别 15 = 30 分钟
        rocketMQTemplate.syncSend(
            "guarantee-disburse-check-topic",
            message,
            3000,
            16  // 延迟级别 16 = 30 分钟
        );
    }
}

/**
 * 延迟消息消费者:处理超时检查
 */
@Service
@RocketMQMessageListener(
    topic = "guarantee-disburse-check-topic",
    consumerGroup = "disburse-check-group"
)
public class DisburseCheckConsumer
        implements RocketMQListener<DisburseCheckMessage> {

    @Autowired private GuaranteeRepository guaranteeRepository;
    @Autowired private BankClient bankClient;

    @Override
    public void onMessage(DisburseCheckMessage message) {
        GuaranteeApplication guarantee = guaranteeRepository
            .findById(new GuaranteeId(message.getGuaranteeId()))
            .orElse(null);

        if (guarantee == null) return;

        // 检查是否已到账
        if (guarantee.isDisbursed()) {
            log.info("放款已到账,担保: {}", message.getGuaranteeId());
            return;
        }

        // 查询银行系统确认
        BankTransferResult bankResult = bankClient
            .queryTransfer(guarantee.getTransferId());

        if (bankResult.isSuccess()) {
            // 到账确认
            guarantee.confirmDisburseArrival();
            guaranteeRepository.save(guarantee);
        } else {
            // 超时未到账 → 人工介入
            alertService.alert("放款超时未到账",
                guarantee.getId().toString());
        }
    }
}

第七部分:消费重试与死信队列

7.1 消费重试机制

/**
 * 消费重试配置
 */
@Service
@RocketMQMessageListener(
    topic = "guarantee-topic",
    consumerGroup = "guarantee-group",
    maxReconsumeTimes = 5  // 最大重试次数
)
public class GuaranteeConsumer
        implements RocketMQListener<GuaranteeEvent> {

    @Override
    public void onMessage(GuaranteeEvent event) {
        try {
            processEvent(event);
        } catch (BusinessException e) {
            // 业务异常:不重试,直接处理
            handleBusinessException(event, e);
        } catch (Exception e) {
            // 系统异常:抛出异常让 RocketMQ 重试
            log.error("处理消息异常,等待重试: {}", event, e);
            throw e;  // ← 抛出异常,触发重试
        }
    }
}

RocketMQ 消费重试策略:

第 1 次失败 → 10 秒后重试
第 2 次失败 → 30 秒后重试
第 3 次失败 → 1 分钟后重试
第 4 次失败 → 2 分钟后重试
...
第 16 次失败 → 进入死信队列

7.2 死信队列处理

重试达到上限后,消息进入死信队列 %DLQ%guarantee-group

/**
 * 死信队列消费者:处理最终失败的消息
 */
@Service
@RocketMQMessageListener(
    topic = "%DLQ%guarantee-group",  // 死信队列 Topic
    consumerGroup = "guarantee-dlq-group"
)
public class GuaranteeDlqConsumer
        implements RocketMQListener<MessageExt> {

    @Autowired private AlertService alertService;
    @Autowired private DeadMessageRepository deadMessageRepository;

    @Override
    public void onMessage(MessageExt messageExt) {
        String msgId = messageExt.getMsgId();
        String topic = messageExt.getTopic();
        String body = new String(messageExt.getBody());
        int reconsumeTimes = messageExt.getReconsumeTimes();

        log.error("消息进入死信队列: msgId={}, topic={}, retries={}",
            msgId, topic, reconsumeTimes);

        // 1. 持久化到死信记录表(用于后续人工处理)
        DeadMessageRecord record = DeadMessageRecord.builder()
            .msgId(msgId)
            .originalTopic(topic)
            .originalGroup(messageExt.getProperty("ORIGIN_MESSAGE_GROUP"))
            .body(body)
            .reconsumeTimes(reconsumeTimes)
            .deadAt(Instant.now())
            .status("PENDING")
            .build();
        deadMessageRepository.save(record);

        // 2. 发送告警
        alertService.alert(
            AlertLevel.HIGH,
            "消息处理最终失败",
            String.format("msgId=%s, topic=%s, retries=%d",
                msgId, topic, reconsumeTimes)
        );
    }
}

死信消息的处理策略

  1. 人工介入:对于资金相关的死信,必须人工确认后重放
  2. 自动重放:对于非关键消息,可开发管理后台一键重放
  3. 补偿对账:定期对账补偿因死信导致的数据不一致

第八部分:实战——担保放款流程异步化改造

8.1 改造前:同步串行

放款流程(同步,耗时长):
  1. 担保审核 (1s)
  2. 风控复审 (2s)
  3. 银行放款接口 (5s)
  4. 额度扣减 (0.5s)
  5. 短信通知 (1s)
  6. 日志记录 (0.5s)
  ────────────
  总计:~10 秒

8.2 改造后:RocketMQ 异步化

@Service
public class DisburseApplicationService {

    @Autowired private RocketMQTemplate rocketMQTemplate;
    @Autowired private GuaranteeRepository guaranteeRepository;

    @Transactional
    public DisburseResult disburse(DisburseCommand command) {
        // 1. 担保确认(同步,必须完成)
        GuaranteeApplication guarantee = guaranteeRepository
            .findById(new GuaranteeId(command.getGuaranteeId()))
            .orElseThrow(() -> new BusinessException("担保不存在"));

        guarantee.confirmDisburse(Money.rmb(command.getAmount()),
            command.getLoanAccount());
        guaranteeRepository.save(guarantee);

        // 2. 发送事务消息:风控复审 + 银行放款 + 额度扣减
        rocketMQTemplate.sendMessageInTransaction(
            "disburse-topic",
            MessageBuilder.withPayload(
                DisburseEvent.from(guarantee, command)).build(),
            guarantee.getId()
        );

        // 3. 发送延迟消息:30 分钟后检查放款状态
        rocketMQTemplate.syncSend("disburse-check-topic",
            MessageBuilder.withPayload(
                new DisburseCheckMessage(guarantee.getId().toString())).build(),
            3000,
            16  // 30 分钟延迟
        );

        // 4. 立即返回
        return DisburseResult.processing(guarantee.getId().toString());
    }
}

// 消费者链:
// 1. DisburseRiskConsumer: 处理风控复审
// 2. DisburseBankConsumer: 调用银行放款
// 3. CreditDeductConsumer: 扣减额度
// 4. NotificationConsumer: 发送通知
// 每个消费者独立,异步并行处理

改造效果

放款流程(异步化后):
  1. 担保确认 (1s)
  2. 发送消息 (0.05s)
  ────────────
  用户等待:~1 秒(相比 10 秒提升了 10 倍)

后台异步处理(用户无感知):
  风控复审 → 银行放款 → 额度扣减 → 短信通知

总结

RocketMQ 金融场景最佳实践

机制使用场景关键点
事务消息分布式事务半消息+回查,保证最终一致性
顺序消息状态机变更按业务 Key 路由,严格有序
延迟消息超时检查18 个预定义级别
死信队列失败兜底重试上限后人工介入

可靠性保障清单

☑ 同步刷盘 (SYNC_FLUSH)
☑ 主从同步复制 (SYNC_MASTER)
☑ 事务消息保证一致性
☑ 消费幂等性(msgId 去重)
☑ 死信队列告警
☑ 业务对账补偿
☑ 消息轨迹追踪

参考资料

  1. Apache RocketMQ 官方文档
  2. Alibaba, 《RocketMQ 技术内幕》
  3. RocketMQ 事务消息原理
上次编辑于:
贡献者: zhengtianqi