一、Redis 简介
Redis(Remote Dictionary Server)是一个基于内存的键值存储系统,支持多种数据结构,常被用作缓存、消息队列和会话存储。它提供高性能读写,并支持持久化、主从复制和集群。
二、Redis 核心特点
三、Redis 常用数据结构
3.1 String(字符串)
SET key value # 设置 GET key # 获取 INCR key # 自增 EXPIRE key seconds # 设置过期时间
3.2 Hash(哈希)
HSET user:1 name "张三" age 25 HGET user:1 name HGETALL user:1
3.3 List(列表)
LPUSH queue task1 # 左侧入队 RPOP queue # 右侧出队 LRANGE queue 0 -1 # 范围查询
3.4 Set(集合)
SADD tags "redis" "cache" SMEMBERS tags SISMEMBER tags "redis"
3.5 Sorted Set(有序集合)
ZADD rank 100 "user1" 95 "user2" ZRANGE rank 0 -1 WITHSCORES ZREVRANK rank "user1"
四、Redis Stream:消息队列
Redis 5.0 引入 Stream,用于实现消息队列,支持:
消息持久化 消费者组 消息确认(ACK) 历史消息回溯4.1 基本命令
# 添加消息 XADD mystream * field1 value1 field2 value2 # 读取消息(从头) XREAD COUNT 10 STREAMS mystream 0 # 读取最新消息(阻塞) XREAD BLOCK 5000 STREAMS mystream $
4.2 消费者组
# 创建消费者组 XGROUP CREATE mystream mygroup 0 MKSTREAM # 消费消息 XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream > # 确认消息 XACK mystream mygroup message-id # 查看待确认消息 XPENDING mystream mygroup
4.3 Stream 与 List 的对比
五、Redis 在项目中的典型用法
以银行场景下的 SOP 合规检测为例,Redis Stream 用于任务分发和结果回传。
5.1 架构示意
后端/前端 → XADD 推送任务 → Redis Stream
↓
sop_engine 监听 XREADGROUP
↓
处理完成后 XADD 推送结果
↓
后端消费 message:xxx:results
5.2 MAF 营销分析任务流
# 1. 推送 MAF 任务 XADD message:maf:tasks * task_id "maf_001" \ conversation_id "maf_001" \ conversation_json_path "bucket/path/conversation.json" # 2. 引擎监听 message:maf:tasks,消费并分析 # 3. 引擎完成后写入结果流 XADD message:maf:results * task_id "maf_001" status "completed" \ result_path "maf-results-bucket/maf_001/analysis_result.json"
5.3 消费者组带来的好处
多个 worker 共同消费,实现负载均衡 每条消息只被组内一个消费者处理 支持 ACK,失败可重试 支持 PEL(Pending Entries List)查看未确认消息5.4 Redis Stream 请求分发规则
如果两个项目使用了同一套Redis配置,那么无法保证Redis Stream 请求会由谁处理,取决于 Redis 消费者组的分发。
两个容器都连同一个 Redis,监听同一个 Stream(如 message:maf:tasks),且通常在同一消费者组(如 maf_analyze_group)里:
Redis 会把消息分发给组内的消费者 每条消息只会被组内一个消费者处理 具体是容器 1 还是容器 2,由 Redis 的分发策略决定,无法指定因此:
有时是A容器处理 有时是B容器处理 测试时无法稳定地“只让某个容器”处理请求可能带来的问题:
如何保证只使用新代码:
方案 1:停掉旧容器
# 停掉旧容器后再部署新容器 docker stop <old_container_id> docker run ... # 启动新容器
方案 2:用不同的 Stream 做测试
新容器监听不同的 Stream,例如 message:maf:tasks_test 测试时往 message:maf:tasks_test 发消息 需要改配置或环境变量,让新容器使用 message:maf:tasks_test方案 3:用不同的消费者组
新容器使用不同的消费者组名,例如 maf_analyze_group_v2 两个组都会收到同一条消息,各自处理一次 适合做 A/B 或灰度,但会产生重复处理,需要业务上能接受方案 4:只保留一个容器
部署新容器前先停掉旧容器 或使用滚动更新,保证同一时间只有一个版本在跑六、Redis 常用配置
# 绑定地址 bind 0.0.0.0 # 端口 port 6379 # 密码 requirepass your_password # 最大内存 maxmemory 2gb maxmemory-policy allkeys-lru # 持久化 save 900 1 save 300 10 save 60 10000 appendonly yes
七、Python 操作 Redis
7.1 Linux安装
# Ubuntu/Debian sudo apt update sudo apt install redis-server # CentOS/RHEL sudo yum install redis # 启动服务 sudo systemctl start redis sudo systemctl enable redis # 验证 redis-cli ping # 返回 PONG 表示成功
7.2 Docker安装
docker run -d --name redis -p 6379:6379 redis:latest # 带密码 docker run -d --name redis -p 6379:6379 redis redis-server --requirepass yourpassword
7.3 连接Redis
# 本地连接 redis-cli # 带密码连接 redis-cli -a yourpassword # 远程连接 redis-cli -h host -p 6379 -a password
7.4 基本使用
import redis r = redis.Redis(host='localhost', port=6379, db=0, password='') # String r.set('name', 'Redis') print(r.get('name')) # Stream 添加消息 r.xadd('mystream', {'task_id': '001', 'data': 'hello'}) # Stream 消费 messages = r.xreadgroup('mygroup', 'consumer1', {'mystream': '>'}, count=1) for stream, msgs in messages: for msg_id, data in msgs: print(msg_id, data) r.xack('mystream', 'mygroup', msg_id)
八、Redis 使用建议
- 合理设置 maxmemory 和淘汰策略,避免 OOM。生产环境开启 requirepass 和访问控制。使用连接池,减少连接开销。对热点 key 做拆分或本地缓存,减轻压力。使用 Pipeline 批量执行命令,降低网络往返。Stream 场景下注意消费者组和 ACK,避免消息堆积和重复消费。
总结
Redis 适合做缓存、会话存储和消息队列。Stream 在需要可靠消费、负载均衡和消息确认的场景中,比简单 List 更合适。结合具体业务(如 MAF 任务流),可以设计出清晰、可扩展的异步处理架构。
到此这篇关于Redis从基础到Stream消息队列实战指南的文章就介绍到这了,
