MQ消息丢失问题。
有些小伙伴在工作中,一提到消息队列就觉得很简单,但真正遇到线上消息丢失时,排查起来却让人抓狂。
其实,我在实际工作中,也遇到过MQ消息丢失的情况。
今天这篇文章,专门跟大家一起聊聊这个话题,希望对你会有所帮助。
一、消息丢失的三大环节
在深入解决方案之前,我们先搞清楚消息在哪几个环节可能丢失:

1. 生产者发送阶段
2. Broker存储阶段
3. 消费者处理阶段
理解了问题根源,接下来我们看5种实用的解决方案。
二、方案一:生产者确认机制
核心原理
生产者发送消息后等待Broker确认,确保消息成功到达。
这是防止消息丢失的第一道防线。

关键实现
// RabbitMQ生产者确认配置@Beanpublic RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // 消息成功到达Broker
messageStatusService.markConfirmed(correlationData.getId());
} else { // 发送失败,触发重试
retryService.scheduleRetry(correlationData.getId());
}
}); return template;
}// 可靠发送方法public void sendReliable(String exchange, String routingKey, Object message) { String messageId = generateId(); // 先落库保存发送状态
messageStatusService.saveSendingStatus(messageId, message);
// 发送持久化消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
msg.getMessageProperties().setMessageId(messageId); return msg;
}, new CorrelationData(messageId));
}
适用场景
三、方案二:消息持久化机制
核心原理
将消息保存到磁盘,确保Broker重启后消息不丢失。
这是防止Broker端消息丢失的关键。

关键实现
// 持久化队列配置@Beanpublic Queue orderQueue() { return QueueBuilder.durable("order.queue") // 队列持久化
.deadLetterExchange("order.dlx") // 死信交换机
.build();
}// 发送持久化消息public void sendPersistentMessage(Object message) {
rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
return msg;
});
}// Kafka持久化配置@Beanpublic ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
return new DefaultKafkaProducerFactory<>(props);
}
优缺点
优点:
缺点:
四、方案三:消费者确认机制
核心原理
消费者处理完消息后手动向Broker发送确认,Broker收到确认后才删除消息。
这是保证消息不丢失的最后一道防线。

关键实现
// 手动确认消费者@RabbitListener(queues = "order.queue")public void handleMessage(Order order, Message message, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag();
try { // 业务处理
orderService.processOrder(order);
// 手动确认
channel.basicAck(deliveryTag, false);
log.info("消息处理完成: {}", order.getOrderId());
} catch (Exception e) {
log.error("消息处理失败: {}", order.getOrderId(), e);
// 处理失败,重新入队
channel.basicNack(deliveryTag, false, true);
}
}// 消费者容器配置@Beanpublic SimpleRabbitListenerContainerFactory containerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
factory.setPrefetchCount(10); // 预取数量
factory.setConcurrentConsumers(3); // 并发消费者
return factory;
}
注意事项
五、方案四:事务消息机制
核心原理
通过事务保证本地业务操作和消息发送的原子性,要么都成功,要么都失败。

关键实现
// 本地事务表方案@Transactionalpublic void createOrder(Order order) { // 1. 保存订单到数据库
orderRepository.save(order);
// 2. 保存消息到本地消息表
LocalMessage localMessage = new LocalMessage();
localMessage.setBusinessId(order.getOrderId());
localMessage.setContent(JSON.toJSONString(order));
localMessage.setStatus(MessageStatus.PENDING);
localMessageRepository.save(localMessage);
// 3. 事务提交,本地业务和消息存储保持一致性}// 定时任务扫描并发送消息@Scheduled(fixedDelay = 5000)public void sendPendingMessages() {
List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);
for (LocalMessage message : pendingMessages) { try { // 发送消息到MQ
rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());
// 更新消息状态为已发送
message.setStatus(MessageStatus.SENT);
localMessageRepository.save(message);
} catch (Exception e) {
log.error("发送消息失败: {}", message.getId(), e);
}
}
}// RocketMQ事务消息public void sendTransactionMessage(Order order) { TransactionMQProducer producer = new TransactionMQProducer("order_producer");
// 发送事务消息
Message msg = new Message("order_topic", "create",
JSON.toJSONBytes(order));
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
log.info("事务消息提交成功");
}
}
适用场景
六、方案五:消息重试与死信队列
核心原理
通过重试机制处理临时故障,通过死信队列处理最终无法消费的消息。

关键实现
// 重试队列配置@Beanpublic Queue orderQueue() { return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "order.dlx") // 死信交换机
.withArgument("x-dead-letter-routing-key", "order.dead")
.withArgument("x-message-ttl", 60000) // 60秒后进入死信
.build();
}// 死信队列配置@Beanpublic Queue orderDeadLetterQueue() { return QueueBuilder.durable("order.dead.queue").build();
}// 消费者重试逻辑@RabbitListener(queues = "order.queue")public void handleMessageWithRetry(Order order, Message message, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {www.wanquanba.com>www.dykwdz.cn>www.hipark.cn>www.chituma.net.cn
>
>
>www.dzezz.net
>
>www.ctwcm.com.cn
>
>www.tianjinaoda.com
orderService.processOrder(order);
channel.basicAck(deliveryTag, false);
} catch (TemporaryException e) { // 临时异常,重新入队重试
channel.basicNack(deliveryTag, false, true);
} catch (PermanentException e) { // 异常,直接确认进入死信队列
channel.basicAck(deliveryTag, false);
log.error("消息进入死信队列: {}", order.getOrderId(), e);
}
}// 死信队列消费者@RabbitListener(queues = "order.dead.queue")public void handleDeadLetterMessage(Order order) {
log.warn("处理死信消息: {}", order.getOrderId()); // 发送告警、记录日志、人工处理等
alertService.sendAlert("死信消息告警", order.toString());
}
重试策略建议
- 指数退避:1s, 5s, 15s, 30s
- 最大重试次数:3-5次
- 死信处理:人工介入或特殊处理流程
七、方案对比与选型指南
为了帮助大家选择合适的方案,我整理了详细的对比表:
| 方案 | 可靠性 | 性能影响 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 生产者确认 | 高 | 中 | 低 | 所有需要可靠发送的场景 |
| 消息持久化 | 中 | 中 | 低 | Broker重启保护 |
| 消费者确认 | 高 | 低 | 中 | 确保消息被成功处理 |
| 事务消息 | 最高 | 高 | 高 | 强一致性要求的业务 |
| 重试+死信 | 高 | 低 | 中 | 处理临时故障和最终死信 |
选型建议
初创项目/简单业务:
电商/交易系统:
大数据/日志处理:
金融/支付系统:
总结
消息丢失问题是消息队列使用中的常见挑战,通过今天介绍的5种方案,我们可以构建一个可靠的消息系统:
- 生产者确认机制 - 保证消息成功发送到Broker
- 消息持久化机制 - 防止Broker重启导致消息丢失
- 消费者确认机制 - 确保消息被成功处理
- 事务消息机制 - 保证业务和消息的一致性
- 重试与死信队列 - 处理异常情况和最终死信
有些小伙伴可能会问:"我需要全部使用这些方案吗?
"我的建议是: 根据业务需求选择合适的组合。
对于关键业务,建议至少使用前三种方案;对于普通业务,可以根据实际情况适当简化。
记住,没有完美的方案,只有最适合的方案。
编辑推荐:
- 解决MQ消息丢失问题的5种方案03-02
- Python与动态代理的多元应用:借助ipipd高质量IP池解锁数据潜能03-02
- 紧急通知!代码签名证书有效期大调整,2025年12月26日前抓紧锁定长期权益03-02
- 必要时在前端使用 localStorage 维护 token,并在请求头中手动带上03-02
- 欧盟GDPR升级草案曝光 跨境数据合规成本将增30% - 金海境科技03-02
- MySQL数据库:从架构到优化的全面解析03-02
- 误操作删除HP ProLiant DL380配置导致教育机构数据丢失数据恢复案例03-02
- 大数据时代的数据存储选择:MongoDB与SQL Server的比较03-02
相关推荐
-
雷神推出 MIX PRO II 迷你主机:基于 Ultra 200H,玻璃上盖 + ARGB 灯效
2 月 9 日消息,雷神 (THUNDEROBOT) 现已宣布推出基于英
-
制造商 Musnap 推出彩色墨水屏电纸书 Ocean C:支持手写笔、第三方安卓应用
2 月 10 日消息,制造商 Musnap 现已在海外推出一款 Oce
