Redis从基础到Stream消息队列实战指南

来源:这里教程网 时间:2026-03-23 11:11:19 作者:
一、Redis 简介二、Redis 核心特点三、Redis 常用数据结构3.1 String(字符串)3.2 Hash(哈希)3.3 List(列表)3.4 Set(集合)3.5 Sorted Set(有序集合)四、Redis Stream:消息队列4.1 基本命令4.2 消费者组4.3 Stream 与 List 的对比五、Redis 在项目中的典型用法5.1 架构示意5.2 MAF 营销分析任务流5.3 消费者组带来的好处5.4 Redis Stream 请求分发规则六、Redis 常用配置七、Python 操作 Redis7.1 Linux安装7.2 Docker安装7.3 连接Redis7.4 基本使用八、Redis 使用建议总结

一、Redis 简介

Redis(Remote Dictionary Server)是一个基于内存的键值存储系统,支持多种数据结构,常被用作缓存、消息队列和会话存储。它提供高性能读写,并支持持久化、主从复制和集群。

二、Redis 核心特点

特点说明内存存储数据主要在内存中,读写延迟低(通常微秒级)持久化支持 RDB 快照和 AOF 日志,保证数据不丢失多种数据结构String、List、Hash、Set、Sorted Set、Stream、Bitmap 等单线程模型命令串行执行,避免锁竞争,保证原子性主从复制支持读写分离和高可用发布订阅支持 Pub/Sub 和 Stream 消息队列事务支持 MULTI/EXEC 事务Lua 脚本支持 Lua 脚本,保证原子性集群支持 Redis Cluster 水平扩展

三、Redis 常用数据结构

3.1 String(字符串)

SET key value # 设置 GET key # 获取 INCR key # 自增 EXPIRE key seconds # 设置过期时间

3.2 Hash(哈希)

HSET user:1 name "张三" age 25 HGET user:1 name HGETALL user:1

3.3 List(列表)

LPUSH queue task1 # 左侧入队 RPOP queue # 右侧出队 LRANGE queue 0 -1 # 范围查询

3.4 Set(集合)

SADD tags "redis" "cache" SMEMBERS tags SISMEMBER tags "redis"

3.5 Sorted Set(有序集合)

ZADD rank 100 "user1" 95 "user2" ZRANGE rank 0 -1 WITHSCORES ZREVRANK rank "user1"

四、Redis Stream:消息队列

Redis 5.0 引入 Stream,用于实现消息队列,支持:

消息持久化 消费者组 消息确认(ACK) 历史消息回溯

4.1 基本命令

# 添加消息 XADD mystream * field1 value1 field2 value2 # 读取消息(从头) XREAD COUNT 10 STREAMS mystream 0 # 读取最新消息(阻塞) XREAD BLOCK 5000 STREAMS mystream $

4.2 消费者组

# 创建消费者组 XGROUP CREATE mystream mygroup 0 MKSTREAM # 消费消息 XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream > # 确认消息 XACK mystream mygroup message-id # 查看待确认消息 XPENDING mystream mygroup

4.3 Stream 与 List 的对比

特性ListStream消息持久化有有消费者组无有消息确认无有历史回溯有限支持适用场景简单队列可靠消息队列

五、Redis 在项目中的典型用法

以银行场景下的 SOP 合规检测为例,Redis Stream 用于任务分发和结果回传。

5.1 架构示意

后端/前端 → XADD 推送任务 → Redis Stream
                              ↓
                    sop_engine 监听 XREADGROUP
                              ↓
                    处理完成后 XADD 推送结果
                              ↓
                    后端消费 message:xxx:results

5.2 MAF 营销分析任务流

# 1. 推送 MAF 任务 XADD message:maf:tasks * task_id "maf_001" \ conversation_id "maf_001" \ conversation_json_path "bucket/path/conversation.json" # 2. 引擎监听 message:maf:tasks,消费并分析 # 3. 引擎完成后写入结果流 XADD message:maf:results * task_id "maf_001" status "completed" \ result_path "maf-results-bucket/maf_001/analysis_result.json"

5.3 消费者组带来的好处

多个 worker 共同消费,实现负载均衡 每条消息只被组内一个消费者处理 支持 ACK,失败可重试 支持 PEL(Pending Entries List)查看未确认消息

5.4 Redis Stream 请求分发规则

如果两个项目使用了同一套Redis配置,那么无法保证Redis Stream 请求会由谁处理,取决于 Redis 消费者组的分发。

两个容器都连同一个 Redis,监听同一个 Stream(如 message:maf:tasks),且通常在同一消费者组(如 maf_analyze_group)里:

Redis 会把消息分发给组内的消费者 每条消息只会被组内一个消费者处理 具体是容器 1 还是容器 2,由 Redis 的分发策略决定,无法指定

因此:

有时是A容器处理 有时是B容器处理 测试时无法稳定地“只让某个容器”处理请求

可能带来的问题:

场景说明代码版本混用同一批任务,部分由旧代码处理,部分由新代码处理测试不可控无法保证测试请求一定打到新容器任务重复若消费者组配置不当,可能出现同一条消息被多个消费者处理

如何保证只使用新代码:

方案 1:停掉旧容器

# 停掉旧容器后再部署新容器 docker stop <old_container_id> docker run ... # 启动新容器

方案 2:用不同的 Stream 做测试

新容器监听不同的 Stream,例如 message:maf:tasks_test 测试时往 message:maf:tasks_test 发消息 需要改配置或环境变量,让新容器使用 message:maf:tasks_test

方案 3:用不同的消费者组

新容器使用不同的消费者组名,例如 maf_analyze_group_v2 两个组都会收到同一条消息,各自处理一次 适合做 A/B 或灰度,但会产生重复处理,需要业务上能接受

方案 4:只保留一个容器

部署新容器前先停掉旧容器 或使用滚动更新,保证同一时间只有一个版本在跑

六、Redis 常用配置

# 绑定地址 bind 0.0.0.0 # 端口 port 6379 # 密码 requirepass your_password # 最大内存 maxmemory 2gb maxmemory-policy allkeys-lru # 持久化 save 900 1 save 300 10 save 60 10000 appendonly yes

七、Python 操作 Redis

7.1 Linux安装

# Ubuntu/Debian sudo apt update sudo apt install redis-server # CentOS/RHEL sudo yum install redis # 启动服务 sudo systemctl start redis sudo systemctl enable redis # 验证 redis-cli ping # 返回 PONG 表示成功

7.2 Docker安装

docker run -d --name redis -p 6379:6379 redis:latest # 带密码 docker run -d --name redis -p 6379:6379 redis redis-server --requirepass yourpassword

7.3 连接Redis

# 本地连接 redis-cli # 带密码连接 redis-cli -a yourpassword # 远程连接 redis-cli -h host -p 6379 -a password

7.4 基本使用

import redis r = redis.Redis(host='localhost', port=6379, db=0, password='') # String r.set('name', 'Redis') print(r.get('name')) # Stream 添加消息 r.xadd('mystream', {'task_id': '001', 'data': 'hello'}) # Stream 消费 messages = r.xreadgroup('mygroup', 'consumer1', {'mystream': '>'}, count=1) for stream, msgs in messages: for msg_id, data in msgs: print(msg_id, data) r.xack('mystream', 'mygroup', msg_id)

八、Redis 使用建议

    合理设置 maxmemory 和淘汰策略,避免 OOM。生产环境开启 requirepass 和访问控制。使用连接池,减少连接开销。对热点 key 做拆分或本地缓存,减轻压力。使用 Pipeline 批量执行命令,降低网络往返。Stream 场景下注意消费者组和 ACK,避免消息堆积和重复消费。

总结

Redis 适合做缓存、会话存储和消息队列。Stream 在需要可靠消费、负载均衡和消息确认的场景中,比简单 List 更合适。结合具体业务(如 MAF 任务流),可以设计出清晰、可扩展的异步处理架构。

到此这篇关于Redis从基础到Stream消息队列实战指南的文章就介绍到这了,

相关推荐

热文推荐