前言
在电商秒杀场景中,瞬间爆发的海量请求往往成为系统的生死考验。当并发量达到数万甚至数十万QPS时,传统数据库单表架构难以支撑,而Redis与消息队列(MQ)的组合凭借其高性能与可靠性,成为应对高并发秒杀的黄金方案。
方案总览
用户请求 → 前端生成Token → Redis执行Lua脚本(预扣减+防重+流水)→ 发送RocketMQ事务消息 →
[本地事务校验Redis结果] → MQ消息确认(COMMIT/ROLLBACK)→ 消费者消费消息 → MySQL扣减库存+记录订单
秒杀系统的核心诉求是抗并发、防超卖、保一致。Redis+MQ 方案通过 “前端拦截 - 中间缓冲 - 后端落地” 的三层架构实现这一目标:
Redis通过Lua脚本原子性处理库存预扣减,过滤无效请求;中间缓冲:MQ(如 RocketMQ)通过事务消息削峰填谷,确保流量平稳进入数据库;后端落地:MySQL最终存储库存与订单数据,通过事务消息保障与Redis的一致性。
流程拆解(示例代码)
Redis 库存预扣减
预扣减流程
开始
│
├─ 生成Token(前端)
│
├─ 前端携带Token请求秒杀
│
├─ 执行Lua脚本
│ │
│ ├─ 检查Token是否存在(Hash结构)
│ │ ├─ 存在 → 返回“重复提交”
│ │ └─ 不存在 → 继续
│ │
│ ├─ 获取Redis库存(String结构)
│ │ ├─ 库存不足 → 返回“库存不足”
│ │ └─ 库存充足 → 继续
│ │
│ ├─ 扣减Redis库存并更新
│ │
│ └─ 记录流水到Hash结构
│
├─ 返回扣减结果(成功/失败)
│
结束
Lua 脚本
-- 启用Redis命令复制,确保脚本在集群环境中正确同步 redis.replicate_commands() -- 1. 防重提交校验:通过用户ID+Token判断是否重复提交 -- KEYS[2]为用户ID(uid),ARGV[2]为本次请求的Token if redis.call('hexists', KEYS[2], ARGV[2]) == 1 then return redis.error_reply('repeat submit') -- 重复提交,返回错误 end -- 2. 库存充足性校验 local product_id = KEYS[1] -- 商品ID local stock = redis.call('get', KEYS[1]) -- 获取当前库存 if not stock then -- 库存不存在(如商品未上架) return redis.error_reply('product not found') end if tonumber(stock) < tonumber(ARGV[1]) then -- 库存不足 return redis.error_reply('stock is not enough') end -- 3. 执行库存扣减 local remaining_stock = tonumber(stock) - tonumber(ARGV[1]) redis.call('set', KEYS[1], tostring(remaining_stock)) -- 更新库存 -- 4. 记录交易流水(用于后续一致性校验) local time = redis.call('time') -- 获取当前时间(秒+微秒) local currentTimeMillis = (time[1] * 1000) + math.floor(time[2] / 1000) -- 转换为毫秒时间戳 -- 存储流水到Hash结构:用户ID → Token → 流水详情 redis.call('hset', KEYS[2], ARGV[2], cjson.encode({ action = '扣减库存', product = product_id, from = stock, -- 扣减前库存 to = remaining_stock, -- 扣减后库存 change = ARGV[1], -- 扣减数量 token = ARGV[2], timestamp = currentTimeMillis }) ) return remaining_stock -- 返回扣减后库存
Java 调用 Lua
@Service public class SeckillService { @Autowired private StringRedisTemplate redisTemplate; // 加载Lua脚本 private DefaultRedisScript<Long> stockScript; @PostConstruct public void init() { stockScript = new DefaultRedisScript<>(); stockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("seckill.lua"))); stockScript.setResultType(Long.class); } /** * 执行Redis库存预扣减 * @param productId 商品ID * @param uid 用户ID * @param quantity 购买数量 * @param token 防重Token * @return 扣减后库存(-1表示失败) */ public Long preDeductStock(String productId, String uid, Integer quantity, String token) { try { // 执行Lua脚本:KEYS = [商品ID, 用户ID],ARGV = [数量, Token] return redisTemplate.execute( stockScript, Arrays.asList(productId, uid), quantity.toString(), token ); } catch (Exception e) { log.error("Redis预扣减失败", e); return -1L; } } }
MySQL 库存扣减
扣减流程图
开始
│
├─ 发送半消息到RocketMQ
│
├─ 执行本地事务
│ │
│ ├─ 检查Redis流水是否存在
│ │ ├─ 存在 → 提交消息(COMMIT)
│ │ └─ 不存在 → 回滚消息(ROLLBACK)
│ │
│ └─ 未知状态 → 等待回查
│
├─ RocketMQ回查机制
│ ├─ 有流水 → 提交消息
│ └─ 无流水 → 回滚消息
│
├─ 消息被消费
│ │
│ ├─ 查询数据库当前版本号(乐观锁)
│ │
│ ├─ 执行库存扣减(WHERE version = 当前版本)
│ │ ├─ 扣减成功 → 记录数据库流水
│ │ └─ 扣减失败 → 抛出异常(触发重试)
│ │
├─ 结束
发送半消息
系统首先向RocketMQ发送一条半消息(Half Message)。此时消息处于不可消费状态,需等待生产者确认本地事务执行结果后,才会被消费者处理。
// 发送半消息 public void sendHalfMessage(String productId, String uid, String token, Integer quantity) { // 构建消息 Message message = new Message( "seckill_topic", // 主题 "stock_deduct", // 标签 JSON.toJSONString(new SeckillMessage(productId, uid, token, quantity)).getBytes() ); // 发送事务消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "seckill_producer_group", // 生产者组 message, null // 本地事务参数(可传递上下文) ); log.info("半消息发送结果:{}", result.getSendStatus()); }
本地事务校验
本地事务的核心是判断Redis预扣减是否成功:
Redis的Lua脚本执行成功(即库存预扣减完成且流水已记录),则向RocketMQ返回 提交(COMMIT)指令,消息变为可消费状态;若Redis预扣减失败(如库存不足或重复提交),则返回回滚(ROLLBACK)指令,消息被丢弃。若RocketMQ长时间未收到本地事务结果(如生产者宕机),会触发消息回查。此时系统通过检查Redis中是否存在对应交易流水,判断是否需要提交消息:若流水存在,则提交;否则回滚。
@Component public class SeckillTransactionListener implements TransactionListener { @Autowired private StringRedisTemplate redisTemplate; // 执行本地事务 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class); // 检查Redis中是否存在对应流水(验证预扣减成功) Boolean flag = redisTemplate.opsForHash().hasKey( message.getUid(), // Hash key:用户ID message.getToken() // Hash field:Token ); return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { return RocketMQLocalTransactionState.UNKNOWN; // 未知状态,触发回查 } } // 消息回查(解决超时未确认问题) @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class); // 回查逻辑:再次检查流水是否存在 Boolean flag = redisTemplate.opsForHash().hasKey(message.getUid(), message.getToken()); return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK; } }
消费消息并扣减 MySQL 库存
消费者监听消息,执行数据库扣减(需保证幂等性): 消费者接收到可消费的消息后,执行MySQL库存扣减操作,并同步记录数据库中的交易流水。为确保消费成功,需利用MQ的重试机制:若消费失败(如数据库暂时不可用),MQ会自动重试,直至消费成功或达到最大重试次数(此时需人工介入处理)。
@Component @RocketMQMessageListener( topic = "seckill_topic", consumerGroup = "seckill_consumer_group", messageModel = MessageModel.CLUSTERING ) public class SeckillConsumer implements RocketMQListener<MessageExt> { @Autowired private JdbcTemplate jdbcTemplate; @Override public void onMessage(MessageExt message) { SeckillMessage msg = JSON.parseObject(new String(message.getBody()), SeckillMessage.class); String productId = msg.getProductId(); int quantity = msg.getQuantity(); // 数据库扣减(使用乐观锁防超卖) String sql = "UPDATE product_stock " + "SET stock = stock - ?, version = version + 1 " + "WHERE product_id = ? AND stock >= ? AND version = ?"; // 1. 查询当前版本号 Integer version = jdbcTemplate.queryForObject( "SELECT version FROM product_stock WHERE product_id = ?", Integer.class, productId ); // 2. 执行扣减(乐观锁保证原子性) int rows = jdbcTemplate.update(sql, quantity, productId, quantity, version); if (rows > 0) { // 扣减成功:记录数据库流水 jdbcTemplate.update( "INSERT INTO stock_flow (product_id, quantity, op_type, create_time) " + "VALUES (?, ?, 'SECKILL', NOW())", productId, quantity ); // 确认消费成功(返回ACK) } else { // 扣减失败:触发重试(MQ默认重试机制) throw new RuntimeException("数据库扣减失败,触发重试"); } } }
一致性保障
为防止Redis与MySQL数据不一致(如Redis扣减成功但MySQL扣减失败),需定期对账:
@Scheduled(cron = "0 0 */1 * * ?") // 每小时执行一次 public void reconcileStock() { // 1. 扫描Redis中未同步到MySQL的流水 Set<String> uids = redisTemplate.keys("uid:*"); // 假设用户ID前缀为uid: for (String uid : uids) { Map<Object, Object> tokenMap = redisTemplate.opsForHash().entries(uid); for (Map.Entry<Object, Object> entry : tokenMap.entrySet()) { String token = (String) entry.getKey(); String flowJson = (String) entry.getValue(); SeckillFlow flow = JSON.parseObject(flowJson, SeckillFlow.class); // 2. 检查MySQL是否有对应订单 Integer count = jdbcTemplate.queryForObject( "SELECT COUNT(1) FROM orders WHERE product_id = ? AND uid = ? AND token = ?", Integer.class, flow.getProduct(), flow.getUid(), token ); if (count == 0) { // 3. 未找到订单 → 人工介入或自动回滚Redis库存 log.warn("发现不一致:Redis有流水但MySQL无订单,product={}, uid={}", flow.getProduct(), uid); // redisTemplate.opsForValue().increment(flow.getProduct(), Integer.parseInt(flow.getChange())); } } } }
系统可通过定时任务对比Redis流水、MySQL库存流水与订单表数据:若Redis流水存在但订单表无对应记录,说明订单生成失败,需人工介入补单或回滚Redis库存,避免少卖;若订单表有记录但MySQL库存未扣减,则需触发库存补扣,避免多卖。
总结
Redis + MQ 方案通过预扣减 + 事务消息 + 对账三重机制,完美解决了高并发秒杀的核心痛点:
Redis承担高并发读写,通过Lua脚本确保原子性,防止超卖;MQ事务消息保障Redis与MySQL的最终一致性,避免数据断层;流水对账作为最后一道防线,及时发现并修复异常。
到此这篇关于Redis+MQ高并发秒杀的技术方案与实现的文章就介绍到这了,
