Redis Stream 是 Redis 5.0
版本新增加的数据结构(适用于简单的需求,要求必须及时消费,否者会造成内存堆积OOM,高业务要求建议rocketmq和kafka)。
Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 结构
如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
Redis Stream 相关命令
消息队列
xadd
:添加消息到末尾xtrim
:对流进行修剪,限制长度xdel
:删除消息xlen
:获取流包含的元素数量,即消息长度xrange
:获取消息列表,会自动过滤已经删除的消息xrevrange
:反向获取消息列表,ID 从大到小xread
:以阻塞或非阻塞方式获取消息列表
消费者组
xgroup create
:创建消费者组xreadgroup group
:读取消费者组中的消息xack
:将消息标记为"已处理"xgroup setid
:为消费者组设置新的最后递送消息IDxgroup delconsumer
:删除消费者xgroup destroy
:删除消费者组xpending
:显示待处理消息的相关信息xclaim
:转移消息的归属权xinfo
:查看流和消费者组的相关信息xinfo groups
:打印消费者组的信息xinfo stream
:打印流信息
帮助命令
127.0.0.1:6379> help xadd XADD key ID field value [field value ...] summary: Appends a new entry to a stream since: 5.0.0 group: stream 127.0.0.1:6379> help xgroup XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername] summary: Create, destroy, and manage consumer groups. since: 5.0.0 group: stream 127.0.0.1:6379> xgroup help 1) XGROUP <subcommand> arg arg ... arg. Subcommands are: 2) CREATE <key> <groupname> <id or $> [opt] -- Create a new consumer group. 3) option MKSTREAM: create the empty stream if it does not exist. 4) SETID <key> <groupname> <id or $> -- Set the current group ID. 5) DESTROY <key> <groupname> -- Remove the specified group. 6) DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer. 7) HELP -- Prints this help.
Redis Stream 命令解析
1、xadd 添加消息
使用 xadd
向队列添加消息,如果指定的队列不存在,则创建一个队列。
语法格式: XADD key ID field value [field value ...]
key:队列名称,如果不存在就创建
ID:消息 id,使用 * 表示由 redis 生成,可以自定义,但要自己保证唯一
field value:记录
消息的全局唯一 ID 由两部分组成,第一部分“1619578295075”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。
127.0.0.1:6379> xadd mystream * name tingfeng "1619578295075-0" 127.0.0.1:6379> xadd mystream * name balaili "1619578304192-0" 127.0.0.1:6379> xadd mystream * f1 v1 f2 v2 f3 v3 "1619578307667-0" 127.0.0.1:6379> xlen mystream (integer) 3 127.0.0.1:6379> xrange mystream - + 1) 1) "1619578295075-0" 2) 1) "name" 2) "tingfeng" 2) 1) "1619578304192-0" 2) 1) "name" 2) "balaili" 3) 1) "1619578307667-0" 2) 1) "f1" 2) "v1" 3) "f2" 4) "v2" 5) "f3" 6) "v3" 127.0.0.1:6379>
2、xtrim 修剪流
使用 xtrim
对流进行范围修剪(取尾部数据),限制长度。
语法格式: XTRIM key MAXLEN [~] count
key:队列名称
MAXLEN:长度
count:数量
127.0.0.1:6379> xtrim mystream maxlen 2 (integer) 1 127.0.0.1:6379> xrange mystream - + 1) 1) "1619578304192-0" 2) 1) "name" 2) "balaili" 2) 1) "1619578307667-0" 2) 1) "f1" 2) "v1" 3) "f2" 4) "v2" 5) "f3" 6) "v3"
3、xlen 流长度
使用 xlen
获取流包含的元素数量,即消息长度。
语法格式: XLEN key
key:队列名称
4、xdel 删除消息
使用 xdel
删除消息
语法格式: XDEL key ID [ID ...]
key:队列名称
ID:消息 ID
127.0.0.1:6379> xadd mydemo * a 1 "1619579142062-0" 127.0.0.1:6379> xadd mydemo * b 2 "1619579145385-0" 127.0.0.1:6379> xadd mydemo * c 3 d 4 e 5 "1619579158816-0" 127.0.0.1:6379> xlen mydemo (integer) 3 127.0.0.1:6379> xdel mydemo 1619579145385-0 (integer) 1 127.0.0.1:6379> xrange mydemo - + 1) 1) "1619579142062-0" 2) 1) "a" 2) "1" 2) 1) "1619579158816-0" 2) 1) "c" 2) "3" 3) "d" 4) "4" 5) "e" 6) "5" 127.0.0.1:6379> del mydemo # 删除key (integer) 1
5、xrange 消息列表
使用 xrange
获取消息列表,会自动过滤已经删除的消息。
语法格式: XRANGE key start end [COUNT count]
key:队列名
start:开始值,
-
表示最小值end:结束值,
+
表示最大值count:数量(取头部数据)
127.0.0.1:6379> xadd mydemo * name aaa "1619579326631-0" 127.0.0.1:6379> xadd mydemo * name bbb "1619579328639-0" 127.0.0.1:6379> xadd mydemo * name ccc "1619579330732-0" 127.0.0.1:6379> xadd mydemo * name ddd "1619579333019-0" 127.0.0.1:6379> xadd mydemo * name eee "1619579335736-0" 127.0.0.1:6379> xlen mydemo (integer) 5 127.0.0.1:6379> xrange mydemo - + count 2 1) 1) "1619579326631-0" 2) 1) "name" 2) "aaa" 2) 1) "1619579328639-0" 2) 1) "name" 2) "bbb" 127.0.0.1:6379> del mydemo # 删除key (integer) 1
6、xrevrange 消息列表
使用 xrevrange
获取消息列表,会自动过滤已经删除的消息。
语法格式: XREVRANGE key end start [COUNT count]
key:队列名
end:结束值,
+
表示最大值start:开始值,
-
表示最小值count:数量
127.0.0.1:6379> xadd mydemo * name aaa "1619579822470-0" 127.0.0.1:6379> xadd mydemo * name bbb "1619579825203-0" 127.0.0.1:6379> xadd mydemo * name ccc "1619579827369-0" 127.0.0.1:6379> xlen mydemo (integer) 3 127.0.0.1:6379> xrevrange mydemo + - 1) 1) "1619579827369-0" 2) 1) "name" 2) "ccc" 2) 1) "1619579825203-0" 2) 1) "name" 2) "bbb" 3) 1) "1619579822470-0" 2) 1) "name" 2) "aaa" 127.0.0.1:6379> xrevrange mydemo + - count 1 1) 1) "1619579827369-0" 2) 1) "name" 2) "ccc"
7、xread 消息列表
使用 xread
以阻塞或非阻塞方式获取消息列表。
语法格式: XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count
:数量milliseconds
/block
:阻塞毫秒数,没有设置就是非阻塞模式,取到一个结果就退出。key
:队列名id
:消息 ID
# 创建两个流 127.0.0.1:6379> xadd demo1 * name aaa "1619580212875-0" 127.0.0.1:6379> xadd demo1 * name bbb "1619580215254-0" 127.0.0.1:6379> xadd demo2 * name ccc "1619580226876-0" 127.0.0.1:6379> xadd demo2 * name ddd "1619580230966-0" # 读取所有 # 0-0 从头消费 127.0.0.1:6379> xread streams demo1 0-0 1) 1) "demo1" 2) 1) 1) "1619580212875-0" 2) 1) "name" 2) "aaa" 2) 1) "1619580215254-0" 2) 1) "name" 2) "bbb" # 读取1个头部消息 127.0.0.1:6379> xread count 1 streams demo1 0-0 1) 1) "demo1" 2) 1) 1) "1619580212875-0" 2) 1) "name" 2) "aaa" # 读取两个key的头部消息 127.0.0.1:6379> xread count 1 streams demo1 demo2 0-0 0-0 1) 1) "demo1" 2) 1) 1) "1619580212875-0" 2) 1) "name" 2) "aaa" 2) 1) "demo2" 2) 1) 1) "1619580226876-0" 2) 1) "name" 2) "ccc" # 阻塞30秒监听demo3消息,有一个结果就退出 # $ 表示最新消息 127.0.0.1:6379> xread block 30000 streams demo3 $ 1) 1) "demo3" 2) 1) 1) "1619580596131-0" 2) 1) "name" 2) "ttt" (1.84s) 127.0.0.1:6379>
命令最后的“ $
”符号表示读取最新的消息,同时,我们设置了 block 30000 的配置项,单位是毫秒,表明在读取最新消息时,如果没有消息到来,将阻塞30000 毫秒(即 30 秒),然后再返回。
8、xgroup create 创建消费者组
Streams 本身可以使用 xgroup 创建消费组,创建消费组之后,Streams 可以使用 xreadgroup 命令让消费组内的消费者读取消息。
使用 xgroup create
创建消费者组。
语法格式: XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key:队列名称,如果不存在就创建
groupname:组名。
$:从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
#从头开始消费 xgroup create mystream consumer-group-name 0-0 #从尾部开始消费 xgroup create mystream consumer-group-name $
示例
127.0.0.1:6379> xadd mydemo * name aaa "1619581865830-0" 127.0.0.1:6379> xadd mydemo * name bbb "1619581868367-0" 127.0.0.1:6379> xadd mydemo * name ccc "1619581870373-0" # 创建 group1 消费组,消费 mydemo 队列消息 127.0.0.1:6379> xgroup create mydemo group1 0 OK # 让 group1 消费组里的消费者 consumer1 从 mydemo 中读取所有消息 127.0.0.1:6379> xreadgroup group group1 consumer1 streams mydemo > 1) 1) "mydemo" 2) 1) 1) "1619581865830-0" 2) 1) "name" 2) "aaa" 2) 1) "1619581868367-0" 2) 1) "name" 2) "bbb" 3) 1) "1619581870373-0" 2) 1) "name" 2) "ccc"
命令最后的参数“ >
”,表示从第一条尚未被消费的消息开始读取。因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以 consumer1 就得到 mydemo 消息队列中的所有消息了(消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了)。
9、xreadgroup group 读取消费者组消息
使用 xreadgroup group
读取消费组中的消息。
语法格式: XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消费组名
consumer:消费者名
count:读取数量
milliseconds / block:阻塞毫秒数
key:队列名
ID:消息 ID
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599203861727-0" 2) 1) "repo" 2) "5" XREADGROUP group group2 consumer2 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599274912765-0" 2) 1) "repo" 2) "3" XREADGROUP group group2 consumer3 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599274925823-0" 2) 1) "repo" 2) "2"
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 xack
命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 xack
命令,消息仍然会留存。此时,消费者可以在重启后,用 xpending
命令查看已读取、但尚未确认处理完成的消息。
10、xpending 待处理消息信息
xpending
命令是检查待处理消息列表的接口,用于观察和了解消费者组正在发生的事情:哪些客户端是活跃的,哪些消息在等待消费,或者查看是否有空闲的消息。
此命令与 xclaim
一起使用,用于实现长时间故障的消费者的恢复。
语法格式: xpendign key groupName [start end count] [consumerName]
xpending是一个只读的命令,会输出指定group中所有处于pending message中的消息总个数、开始ID、结束ID、每个consumer中pending message中消息的个数。可以通过指定开始结束id和consumerName来获取更加详细的信息
127.0.0.1:6379> xpending mydemo group1 1) (integer) 3 2) "1619581865830-0" # 消息开始ID 3) "1619581870373-0" # 消息结束ID 4) 1) 1) "consumer1" # 消费者 2) "3" # 消费数量 # 限制消息数量 127.0.0.1:6379> xpending mydemo group1 - + 3 1) 1) "1619581865830-0" # 消息id 2) "consumer1" # 消费者 3) (integer) 12258065 # 自上次将此消息传递给该消费者以来,经过的毫秒数 4) (integer) 1 # 读取的次数 2) 1) "1619581868367-0" 2) "consumer1" 3) (integer) 12258065 4) (integer) 1 3) 1) "1619581870373-0" 2) "consumer1" 3) (integer) 12258065 4) (integer) 1 # 限制指定消费者消息数量 127.0.0.1:6379> xpending mydemo group1 - + 2 consumer1 1) 1) "1619581865830-0" 2) "consumer1" 3) (integer) 12352110 4) (integer) 1 2) 1) "1619581868367-0" 2) "consumer1" 3) (integer) 12352110 4) (integer) 1
11、xack 消息标记
消息标记确认
使用 xack
命令通知 Streams,然后这条消息就会被删除。当我们再使用 xpending
命令查看时,就可以看到,consumer1 已经没有已读取、但尚未确认处理的消息了。
# 让 group1 消费组里的消费者 consumer1 从 mydemo 中读取所有消息 127.0.0.1:6379> xreadgroup group group1 consumer1 streams mydemo > 1) 1) "mydemo" 2) 1) 1) "1619581865830-0" 2) 1) "name" 2) "aaa" 2) 1) "1619581868367-0" 2) 1) "name" 2) "bbb" 3) 1) "1619581870373-0" 2) 1) "name" 2) "ccc" # ack 确认 127.0.0.1:6379> xack mydemo group1 1619581868367-0 (integer) 1 127.0.0.1:6379> xack mydemo group1 1619581865830-0 (integer) 1 127.0.0.1:6379> xpending mydemo group1 1) (integer) 1 2) "1619581870373-0" 3) "1619581870373-0" 4) 1) 1) "consumer1" 2) "1"
12、xgroup setid 消费者组设置新的最后递送消息ID
使用 setid
子命令设置要传递的下一条消息。 通常情况,在消费者创建时设置下一个ID,作为 xgroup create
最后一个参数。 但是使用这种形式,可以在以后修改下一个ID,而无需再次删除和创建使用者组。
例如,如果你希望消费者组中的消费者重新处理流中的所有消息,你可能希望将其下一个ID设置为0。
13、xgroup delconsumer 删除消费者
从消费者组中移除给定的消费者,使用以下命令格式
127.0.0.1:6379> xgroup delconsumer mydemo group1 consumer1 (integer) 1
14、xgroup destroy 删除消费者组
即使存在活动的消费者和待处理消息,消费者组也将被销毁,因此请确保仅在真正需要时才调用此命令。
127.0.0.1:6379> xgroup destroy mydemo group1 (integer) 1
15、xclaim 转移消息归属权
语法格式: xclaim key groupName comsumer minIdleTime id [id ...] [IDLE ms] [TIME msUnixTime] [RETRYCOUNT count] [force] [justid]
16、xinfo 流和消费者组信息
检索关于流和关联的消费者组的不同的信息。
语法格式: XINFO STREAM <key>
127.0.0.1:6379> xinfo stream mydemo 1) "length" 2) (integer) 3 # 流元素的数量 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1619581870373-0" # 最后消息ID 9) "groups" 10) (integer) 1 # 消费者组数量 11) "first-entry" # 第一个消息 12) 1) "1619581865830-0" 2) 1) "name" 2) "aaa" 13) "last-entry" # 最后一个消息 14) 1) "1619581870373-0" 2) 1) "name" 2) "ccc"
17、xinfo groups 消费者组的信息
只获得与流关联的所有消费者组的输出
127.0.0.1:6379> xinfo groups mydemo 1) 1) "name" 2) "group1" 3) "consumers" 4) (integer) 1 5) "pending" 6) (integer) 3 7) "last-delivered-id" 8) "1619581870373-0"
18、xinfo stream 流信息
127.0.0.1:6379> xinfo consumers mydemo group1 1) 1) "name" 2) "consumer1" 3) "pending" 4) (integer) 3 5) "idle" 6) (integer) 417812
未经允许请勿转载:程序喵 » Redis Stream 使用