RocketMQ 金融场景实战:担保系统的消息可靠性设计
大约 10 分钟
RocketMQ 金融场景实战:担保系统的消息可靠性设计
前言
金融系统对消息中间件有三个核心要求:不能丢、不能乱、能回溯。RocketMQ 正是为满足这些要求而设计的。本文以担保业务为实战案例,讲解如何用 RocketMQ 构建可靠的消息系统。
第一部分:为什么金融场景选 RocketMQ
1.1 RocketMQ vs Kafka 关键对比
| 维度 | RocketMQ | Kafka | 金融选型考量 |
|---|---|---|---|
| 事务消息 | 原生支持 | 0.11+支持但复杂 | ★★★ RocketMQ 胜 |
| 顺序消息 | 全局+分区顺序 | 仅分区内顺序 | ★★ RocketMQ 胜 |
| 延迟消息 | 18个内置级别 | 不原生支持 | ★★ RocketMQ 胜 |
| 消息回溯 | 按时间秒级回溯 | 支持但运维复杂 | ★ RocketMQ 胜 |
| 堆积能力 | 亿级消息堆积 | 依赖磁盘容量 | 持平 |
| 社区背景 | 阿里/蚂蚁金融场景 | 大数据/日志 | ★ RocketMQ 更匹配 |
结论:金融交易场景(担保、支付、账务)→ RocketMQ;日志和大数据场景 → Kafka
第二部分:RocketMQ 核心架构
2.1 四大组件
┌───────────────────────────────────────────────┐
│ NameServer │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ NS1 │ │ NS2 │ │ NS3 │ │
│ │(无状态) │ │(无状态) │ │(无状态) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ 注册中心:Broker 注册、Topic 路由发现 │
└───────────────────────────────────────────────┘
▲ ▲ ▲
│ 注册/心跳 │ │
┌─────────┼────────────┼────────────┼───────────┐
│ ▼ ▼ ▼ │
│ Broker Cluster │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Master-1 │ │ Master-2 │ │
│ │ ┌────────┐ │ │ ┌────────┐ │ │
│ │ │Slave-1 │ │ │ │Slave-2 │ │ │
│ │ └────────┘ │ │ └────────┘ │ │
│ └──────────────┘ └──────────────┘ │
│ 消息存储、分发、高可用保障 │
└───────────────────────────────────────────────┘
▲ ▲
│ 发送 │ 拉取
┌─────────┴─────────┐ ┌─────────┴─────────┐
│ Producer │ │ Consumer │
│ - 同步/异步发送 │ │ - 推/拉模式 │
│ - 事务消息 │ │ - 集群/广播消费 │
│ - 顺序消息 │ │ - 顺序消费 │
└───────────────────┘ └───────────────────┘
NameServer 的核心特点:
- 无状态:不保存 Topic 的持久数据,从 Broker 获取
- 最终一致:Broker 定期心跳上报,NameServer 之间不通信
- 简单可靠:对比 Kafka 的 ZooKeeper,运维成本大幅降低
第三部分:消息可靠性
3.1 同步刷盘 vs 异步刷盘
// Broker 配置
// 同步刷盘:消息写入磁盘后才返回成功(最可靠,但性能最低)
flushDiskType = SYNC_FLUSH
// 异步刷盘:消息写入 PageCache 就返回(性能高,但断电可能丢消息)
flushDiskType = ASYNC_FLUSH
金融场景推荐:同步刷盘 + 主从复制
# Broker 配置
flushDiskType=SYNC_FLUSH
brokerRole=SYNC_MASTER
3.2 主从复制
同步复制 (SYNC_MASTER):
Producer → Master ──→ Slave (同步写入)
│ │
←── ack ────┘
→ 等待 Slave 确认后才返回成功
→ 可靠性最高,延迟稍高
异步复制 (ASYNC_MASTER):
Producer → Master (立刻返回)
│
└──→ Slave (异步写入)
→ 性能最高,但主宕机可能丢消息
// 生产者发送消息 - 金融级可靠性配置
@Service
public class GuaranteeMessagePublisher {
@Autowired private RocketMQTemplate rocketMQTemplate;
public SendResult sendGuaranteeEvent(GuaranteeEvent event) {
// 构建消息
Message<String> message = MessageBuilder
.withPayload(JSON.toJSONString(event))
.setHeader(RocketMQHeaders.KEYS, event.getGuaranteeId()) // 业务 Key
.setHeader("eventType", event.getEventType())
.build();
// 同步发送 + 超时重试
return rocketMQTemplate.syncSend(
"guarantee-topic", // Topic
message, // 消息
3000, // 超时 3 秒
3 // 重试 3 次
);
}
}
第四部分:事务消息——分布式事务的可靠实现
4.1 事务消息的执行流程
RocketMQ 事务消息是解决分布式事务的核心武器:
Producer Broker Consumer
│ │ │
│ 1. 发送半消息 │ │
│ ────────────────────────→│ │
│ │ (存半消息,不可见) │
│ 2. 执行本地事务 │ │
│ (如:创建担保订单) │ │
│ │ │
│ 3. 提交/回滚 │ │
│ ────────────────────────→│ │
│ │ │
│ 若超时未收到确认: │ │
│ 4. 回查事务状态 │ │
│ ←────────────────────────│ │
│ 5. 返回事务状态 │ │
│ ────────────────────────→│ │
│ │ │
│ │ 6. 投递消息(消费者可见)│
│ │ ────────────────────────→│
4.2 担保放款流程的事务消息实现
/**
* 担保放款服务 - 使用事务消息保证流程一致性
*
* 流程:担保确认 → 额度扣减 → 放款 → 通知
* 使用 RocketMQ 事务消息保证前三步的最终一致性
*/
@Service
public class GuaranteeDisburseService {
@Autowired private GuaranteeRepository guaranteeRepository;
@Autowired private CreditLineRepository creditLineRepository;
@Autowired private RocketMQTemplate rocketMQTemplate;
/**
* 担保确认放款 - 事务消息模式
*/
@Transactional // 数据库事务
public DisburseResult confirmDisburse(DisburseCommand command) {
// Step 1: 加载担保申请(聚合)
GuaranteeApplication guarantee = guaranteeRepository
.findById(new GuaranteeId(command.getGuaranteeId()))
.orElseThrow(() -> new BusinessException("担保申请不存在"));
// Step 2: 确认放款(聚合内部保证业务不变量)
guarantee.confirmDisburse(
Money.rmb(command.getAmount()),
command.getLoanAccount()
);
// Step 3: 保存担保状态
guaranteeRepository.save(guarantee);
// Step 4: 构建事务消息
DisburseConfirmedEvent event = DisburseConfirmedEvent.builder()
.guaranteeId(guarantee.getId().toString())
.customerId(guarantee.getCustomerId().toString())
.amount(command.getAmount())
.loanAccount(command.getLoanAccount())
.build();
Message<String> message = MessageBuilder
.withPayload(JSON.toJSONString(event))
.setHeader(RocketMQHeaders.TRANSACTION_ID,
guarantee.getId().toString()) // 事务 ID
.build();
// Step 5: 发送事务消息
// 只有本地事务提交成功后,消息才会投递
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"guarantee-disburse-topic",
message,
guarantee.getId() // 传递给事务监听器的参数
);
if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
return DisburseResult.success(guarantee.getStatus());
} else {
throw new BusinessException("放款确认失败");
}
}
}
/**
* 事务消息监听器:处理回查
*/
@RocketMQTransactionListener
public class DisburseTransactionListener
implements RocketMQLocalTransactionListener {
@Autowired private GuaranteeRepository guaranteeRepository;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
// 本地事务已在 DisburseService 中提交
// 这里只需要确认结果
try {
Long guaranteeId = (Long) arg;
GuaranteeApplication guarantee = guaranteeRepository
.findById(new GuaranteeId(guaranteeId)).orElse(null);
if (guarantee != null && guarantee.isDisbursed()) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
} catch (Exception e) {
log.error("事务消息执行异常", e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(
Message msg) {
// Broker 回查:确认事务最终状态
String transactionId = msg.getTransactionId();
Long guaranteeId = extractGuaranteeId(msg);
GuaranteeApplication guarantee = guaranteeRepository
.findById(new GuaranteeId(guaranteeId)).orElse(null);
if (guarantee == null) {
return RocketMQLocalTransactionState.ROLLBACK;
}
if (guarantee.isDisbursed()) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 消费者:额度扣减服务
*/
@Service
@RocketMQMessageListener(
topic = "guarantee-disburse-topic",
consumerGroup = "credit-deduct-group"
)
public class CreditDeductConsumer
implements RocketMQListener<DisburseConfirmedEvent> {
@Autowired private CreditLineRepository creditLineRepository;
@Autowired private CreditDeductLogRepository deductLogRepository;
@Override
@Transactional
public void onMessage(DisburseConfirmedEvent event) {
// 幂等性检查
String msgId = MessageClient.getMessageId();
if (deductLogRepository.existsByMsgId(msgId)) {
log.info("消息已处理,跳过: msgId={}", msgId);
return;
}
try {
// 扣减额度
CreditLine creditLine = creditLineRepository
.findActiveByCustomerId(new CustomerId(event.getCustomerId()))
.orElseThrow(() -> new BusinessException("授信不存在"));
creditLine.deductUsed(Money.rmb(event.getAmount()));
creditLineRepository.save(creditLine);
// 记录处理日志
deductLogRepository.save(new CreditDeductLog(
msgId, event.getGuaranteeId(), event.getAmount()));
} catch (Exception e) {
log.error("额度扣减失败: {}", event, e);
// RocketMQ 会自动重试,不需要手动处理
throw e;
}
}
}
第五部分:顺序消息
5.1 什么是顺序消息
全局顺序消息(一个 Topic 一个队列):
[msg1] → [msg2] → [msg3] → [msg4]
消费者严格按照写入顺序消费
但:并行度 = 1,吞吐量低
分区顺序消息(多个队列,同一 Key 在同一队列):
Queue 0: [A-1] → [A-2] → [A-3] (Key=A)
Queue 1: [B-1] → [B-2] (Key=B)
Queue 2: [C-1] → [C-2] → [C-3] (Key=C)
同一 Key 的消息有序,不同 Key 之间无序
但:每个 Key 独占一个线程,吞吐量受限
5.2 担保状态变更的顺序消息
担保订单的状态变更必须严格按顺序处理:APPROVED → SIGNED → ACTIVE → CLOSED
/**
* 发送端:按 guaranteeId 路由,保证同一担保的顺序
*/
@Service
public class GuaranteeStatusPublisher {
@Autowired private RocketMQTemplate rocketMQTemplate;
public void publishStatusChange(GuaranteeApplication guarantee,
GuaranteeStatusChangeEvent event) {
// 使用 SelectMessageQueueByHash 策略
// 同一个 guaranteeId 的消息发到同一个队列
rocketMQTemplate.syncSendOrderly(
"guarantee-status-topic",
MessageBuilder.withPayload(event).build(),
guarantee.getId().toString() // Hash Key → 同一队列
);
}
}
/**
* 消费端:顺序消费
*/
@Service
@RocketMQMessageListener(
topic = "guarantee-status-topic",
consumerGroup = "guarantee-status-group",
consumeMode = ConsumeMode.ORDERLY // ← 顺序消费模式
)
public class GuaranteeStatusConsumer
implements RocketMQListener<GuaranteeStatusChangeEvent> {
// 每个 MessageQueue 分配一个线程消费,保证顺序
@Override
public void onMessage(GuaranteeStatusChangeEvent event) {
// 处理状态变更
// 对于同一个 guaranteeId,APPROVED 一定在 SIGNED 之前处理
statusChangeHandler.handle(event);
}
}
注意:顺序消费模式下,如果一个消息处理失败,后续消息会被阻塞,直到该消息被成功消费或达到重试上限。
第六部分:延迟消息
6.1 延迟消息的金融场景
RocketMQ 支持 18 个预定义的延迟级别:
1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
担保业务中的典型场景:
| 场景 | 延迟 | 说明 |
|---|---|---|
| 放款超时检查 | 30 分钟 | 放款申请后 30 分钟未到账,自动取消 |
| 签约超时提醒 | 1 小时 | 合同发后 1 小时未签,发送催签短信 |
| 还款到期前提醒 | 到期前 3 天 | 提前 3 天发送还款提醒 |
| 保后定期检查 | 每日 | 每日检查担保状态是否需要更新 |
@Service
public class DelayedGuaranteeService {
@Autowired private RocketMQTemplate rocketMQTemplate;
/**
* 发送延迟消息:放款 30 分钟后检查是否到账
*/
public void scheduleDisburseCheck(GuaranteeApplication guarantee) {
DisburseCheckMessage checkMsg = DisburseCheckMessage.builder()
.guaranteeId(guarantee.getId().toString())
.expectedAmount(guarantee.getGuaranteeAmount().getAmount())
.buildTime(Instant.now())
.build();
Message<String> message = MessageBuilder
.withPayload(JSON.toJSONString(checkMsg))
.build();
// 延迟级别 15 = 30 分钟
rocketMQTemplate.syncSend(
"guarantee-disburse-check-topic",
message,
3000,
16 // 延迟级别 16 = 30 分钟
);
}
}
/**
* 延迟消息消费者:处理超时检查
*/
@Service
@RocketMQMessageListener(
topic = "guarantee-disburse-check-topic",
consumerGroup = "disburse-check-group"
)
public class DisburseCheckConsumer
implements RocketMQListener<DisburseCheckMessage> {
@Autowired private GuaranteeRepository guaranteeRepository;
@Autowired private BankClient bankClient;
@Override
public void onMessage(DisburseCheckMessage message) {
GuaranteeApplication guarantee = guaranteeRepository
.findById(new GuaranteeId(message.getGuaranteeId()))
.orElse(null);
if (guarantee == null) return;
// 检查是否已到账
if (guarantee.isDisbursed()) {
log.info("放款已到账,担保: {}", message.getGuaranteeId());
return;
}
// 查询银行系统确认
BankTransferResult bankResult = bankClient
.queryTransfer(guarantee.getTransferId());
if (bankResult.isSuccess()) {
// 到账确认
guarantee.confirmDisburseArrival();
guaranteeRepository.save(guarantee);
} else {
// 超时未到账 → 人工介入
alertService.alert("放款超时未到账",
guarantee.getId().toString());
}
}
}
第七部分:消费重试与死信队列
7.1 消费重试机制
/**
* 消费重试配置
*/
@Service
@RocketMQMessageListener(
topic = "guarantee-topic",
consumerGroup = "guarantee-group",
maxReconsumeTimes = 5 // 最大重试次数
)
public class GuaranteeConsumer
implements RocketMQListener<GuaranteeEvent> {
@Override
public void onMessage(GuaranteeEvent event) {
try {
processEvent(event);
} catch (BusinessException e) {
// 业务异常:不重试,直接处理
handleBusinessException(event, e);
} catch (Exception e) {
// 系统异常:抛出异常让 RocketMQ 重试
log.error("处理消息异常,等待重试: {}", event, e);
throw e; // ← 抛出异常,触发重试
}
}
}
RocketMQ 消费重试策略:
第 1 次失败 → 10 秒后重试
第 2 次失败 → 30 秒后重试
第 3 次失败 → 1 分钟后重试
第 4 次失败 → 2 分钟后重试
...
第 16 次失败 → 进入死信队列
7.2 死信队列处理
重试达到上限后,消息进入死信队列 %DLQ%guarantee-group:
/**
* 死信队列消费者:处理最终失败的消息
*/
@Service
@RocketMQMessageListener(
topic = "%DLQ%guarantee-group", // 死信队列 Topic
consumerGroup = "guarantee-dlq-group"
)
public class GuaranteeDlqConsumer
implements RocketMQListener<MessageExt> {
@Autowired private AlertService alertService;
@Autowired private DeadMessageRepository deadMessageRepository;
@Override
public void onMessage(MessageExt messageExt) {
String msgId = messageExt.getMsgId();
String topic = messageExt.getTopic();
String body = new String(messageExt.getBody());
int reconsumeTimes = messageExt.getReconsumeTimes();
log.error("消息进入死信队列: msgId={}, topic={}, retries={}",
msgId, topic, reconsumeTimes);
// 1. 持久化到死信记录表(用于后续人工处理)
DeadMessageRecord record = DeadMessageRecord.builder()
.msgId(msgId)
.originalTopic(topic)
.originalGroup(messageExt.getProperty("ORIGIN_MESSAGE_GROUP"))
.body(body)
.reconsumeTimes(reconsumeTimes)
.deadAt(Instant.now())
.status("PENDING")
.build();
deadMessageRepository.save(record);
// 2. 发送告警
alertService.alert(
AlertLevel.HIGH,
"消息处理最终失败",
String.format("msgId=%s, topic=%s, retries=%d",
msgId, topic, reconsumeTimes)
);
}
}
死信消息的处理策略:
- 人工介入:对于资金相关的死信,必须人工确认后重放
- 自动重放:对于非关键消息,可开发管理后台一键重放
- 补偿对账:定期对账补偿因死信导致的数据不一致
第八部分:实战——担保放款流程异步化改造
8.1 改造前:同步串行
放款流程(同步,耗时长):
1. 担保审核 (1s)
2. 风控复审 (2s)
3. 银行放款接口 (5s)
4. 额度扣减 (0.5s)
5. 短信通知 (1s)
6. 日志记录 (0.5s)
────────────
总计:~10 秒
8.2 改造后:RocketMQ 异步化
@Service
public class DisburseApplicationService {
@Autowired private RocketMQTemplate rocketMQTemplate;
@Autowired private GuaranteeRepository guaranteeRepository;
@Transactional
public DisburseResult disburse(DisburseCommand command) {
// 1. 担保确认(同步,必须完成)
GuaranteeApplication guarantee = guaranteeRepository
.findById(new GuaranteeId(command.getGuaranteeId()))
.orElseThrow(() -> new BusinessException("担保不存在"));
guarantee.confirmDisburse(Money.rmb(command.getAmount()),
command.getLoanAccount());
guaranteeRepository.save(guarantee);
// 2. 发送事务消息:风控复审 + 银行放款 + 额度扣减
rocketMQTemplate.sendMessageInTransaction(
"disburse-topic",
MessageBuilder.withPayload(
DisburseEvent.from(guarantee, command)).build(),
guarantee.getId()
);
// 3. 发送延迟消息:30 分钟后检查放款状态
rocketMQTemplate.syncSend("disburse-check-topic",
MessageBuilder.withPayload(
new DisburseCheckMessage(guarantee.getId().toString())).build(),
3000,
16 // 30 分钟延迟
);
// 4. 立即返回
return DisburseResult.processing(guarantee.getId().toString());
}
}
// 消费者链:
// 1. DisburseRiskConsumer: 处理风控复审
// 2. DisburseBankConsumer: 调用银行放款
// 3. CreditDeductConsumer: 扣减额度
// 4. NotificationConsumer: 发送通知
// 每个消费者独立,异步并行处理
改造效果:
放款流程(异步化后):
1. 担保确认 (1s)
2. 发送消息 (0.05s)
────────────
用户等待:~1 秒(相比 10 秒提升了 10 倍)
后台异步处理(用户无感知):
风控复审 → 银行放款 → 额度扣减 → 短信通知
总结
RocketMQ 金融场景最佳实践
| 机制 | 使用场景 | 关键点 |
|---|---|---|
| 事务消息 | 分布式事务 | 半消息+回查,保证最终一致性 |
| 顺序消息 | 状态机变更 | 按业务 Key 路由,严格有序 |
| 延迟消息 | 超时检查 | 18 个预定义级别 |
| 死信队列 | 失败兜底 | 重试上限后人工介入 |
可靠性保障清单
☑ 同步刷盘 (SYNC_FLUSH)
☑ 主从同步复制 (SYNC_MASTER)
☑ 事务消息保证一致性
☑ 消费幂等性(msgId 去重)
☑ 死信队列告警
☑ 业务对账补偿
☑ 消息轨迹追踪
参考资料
- Apache RocketMQ 官方文档
- Alibaba, 《RocketMQ 技术内幕》
- RocketMQ 事务消息原理