Redis Stream 使用

Redis Stream 是 Redis 5.0  版本新增加的数据结构(适用于简单的需求,要求必须及时消费,否者会造成内存堆积OOM,高业务要求建议rocketmq和kafka)。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 结构

如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。

  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。

  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

Redis Stream 相关命令

消息队列

  • xadd :添加消息到末尾

  • xtrim :对流进行修剪,限制长度

  • xdel :删除消息

  • xlen :获取流包含的元素数量,即消息长度

  • xrange :获取消息列表,会自动过滤已经删除的消息

  • xrevrange :反向获取消息列表,ID 从大到小

  • xread :以阻塞或非阻塞方式获取消息列表

消费者组

  • xgroup create :创建消费者组

  • xreadgroup group :读取消费者组中的消息

  • xack :将消息标记为"已处理"

  • xgroup setid :为消费者组设置新的最后递送消息ID

  • xgroup delconsumer :删除消费者

  • xgroup destroy :删除消费者组

  • xpending :显示待处理消息的相关信息

  • xclaim :转移消息的归属权

  • xinfo :查看流和消费者组的相关信息

  • xinfo groups :打印消费者组的信息

  • xinfo stream :打印流信息


帮助命令

127.0.0.1:6379> help xadd

  XADD key ID field value [field value ...]
  summary: Appends a new entry to a stream
  since: 5.0.0
  group: stream

127.0.0.1:6379> help xgroup

  XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  summary: Create, destroy, and manage consumer groups.
  since: 5.0.0
  group: stream

127.0.0.1:6379> xgroup help
1) XGROUP <subcommand> arg arg ... arg. Subcommands are:
2) CREATE      <key> <groupname> <id or $> [opt] -- Create a new consumer group.
3)             option MKSTREAM: create the empty stream if it does not exist.
4) SETID       <key> <groupname> <id or $>  -- Set the current group ID.
5) DESTROY     <key> <groupname>            -- Remove the specified group.
6) DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.
7) HELP                                     -- Prints this help.

Redis Stream 命令解析

1、xadd 添加消息

使用 xadd  向队列添加消息,如果指定的队列不存在,则创建一个队列。

语法格式: XADD key ID field value [field value ...] 

  • key:队列名称,如果不存在就创建

  • ID:消息 id,使用 * 表示由 redis 生成,可以自定义,但要自己保证唯一

  • field value:记录

消息的全局唯一 ID 由两部分组成,第一部分“1619578295075”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。

127.0.0.1:6379> xadd mystream * name tingfeng
"1619578295075-0"
127.0.0.1:6379> xadd mystream * name balaili
"1619578304192-0"
127.0.0.1:6379> xadd mystream * f1 v1 f2 v2 f3 v3
"1619578307667-0"
127.0.0.1:6379> xlen mystream
(integer) 3
127.0.0.1:6379> xrange mystream - +
1) 1) "1619578295075-0"
   2) 1) "name"
      2) "tingfeng"
2) 1) "1619578304192-0"
   2) 1) "name"
      2) "balaili"
3) 1) "1619578307667-0"
   2) 1) "f1"
      2) "v1"
      3) "f2"
      4) "v2"
      5) "f3"
      6) "v3"
127.0.0.1:6379>

2、xtrim 修剪流

使用 xtrim  对流进行范围修剪(取尾部数据),限制长度。

语法格式: XTRIM key MAXLEN [~] count 

  • key:队列名称

  • MAXLEN:长度

  • count:数量

127.0.0.1:6379> xtrim mystream maxlen 2
(integer) 1
127.0.0.1:6379> xrange mystream - +
1) 1) "1619578304192-0"
   2) 1) "name"
      2) "balaili"
2) 1) "1619578307667-0"
   2) 1) "f1"
      2) "v1"
      3) "f2"
      4) "v2"
      5) "f3"
      6) "v3"

3、xlen 流长度

使用 xlen 获取流包含的元素数量,即消息长度。

语法格式: XLEN key 

  • key:队列名称


4、xdel 删除消息

使用 xdel  删除消息

语法格式: XDEL key ID [ID ...] 

  • key:队列名称

  • ID:消息 ID

127.0.0.1:6379> xadd mydemo * a 1
"1619579142062-0"
127.0.0.1:6379> xadd mydemo * b 2
"1619579145385-0"
127.0.0.1:6379> xadd mydemo * c 3 d 4 e 5
"1619579158816-0"
127.0.0.1:6379> xlen mydemo
(integer) 3
127.0.0.1:6379> xdel mydemo 1619579145385-0
(integer) 1
127.0.0.1:6379> xrange mydemo - +
1) 1) "1619579142062-0"
   2) 1) "a"
      2) "1"
2) 1) "1619579158816-0"
   2) 1) "c"
      2) "3"
      3) "d"
      4) "4"
      5) "e"
      6) "5"

127.0.0.1:6379> del mydemo # 删除key
(integer) 1

5、xrange 消息列表

使用 xrange  获取消息列表,会自动过滤已经删除的消息。

语法格式: XRANGE key start end [COUNT count] 

  • key:队列名

  • start:开始值, - 表示最小值

  • end:结束值, + 表示最大值

  • count:数量(取头部数据)

127.0.0.1:6379> xadd mydemo * name aaa
"1619579326631-0"
127.0.0.1:6379> xadd mydemo * name bbb
"1619579328639-0"
127.0.0.1:6379> xadd mydemo * name ccc
"1619579330732-0"
127.0.0.1:6379> xadd mydemo * name ddd
"1619579333019-0"
127.0.0.1:6379> xadd mydemo * name eee
"1619579335736-0"
127.0.0.1:6379> xlen mydemo
(integer) 5
127.0.0.1:6379> xrange mydemo - + count 2
1) 1) "1619579326631-0"
   2) 1) "name"
      2) "aaa"
2) 1) "1619579328639-0"
   2) 1) "name"
      2) "bbb"


127.0.0.1:6379> del mydemo # 删除key
(integer) 1

6、xrevrange 消息列表

使用 xrevrange  获取消息列表,会自动过滤已经删除的消息。
语法格式: XREVRANGE key end start [COUNT count] 

  • key:队列名

  • end:结束值,+ 表示最大值

  • start:开始值,- 表示最小值

  • count:数量

127.0.0.1:6379> xadd mydemo * name aaa
"1619579822470-0"
127.0.0.1:6379> xadd mydemo * name bbb
"1619579825203-0"
127.0.0.1:6379> xadd mydemo * name ccc
"1619579827369-0"
127.0.0.1:6379> xlen mydemo
(integer) 3
127.0.0.1:6379> xrevrange mydemo + -
1) 1) "1619579827369-0"
   2) 1) "name"
      2) "ccc"
2) 1) "1619579825203-0"
   2) 1) "name"
      2) "bbb"
3) 1) "1619579822470-0"
   2) 1) "name"
      2) "aaa"
127.0.0.1:6379> xrevrange mydemo + - count 1
1) 1) "1619579827369-0"
   2) 1) "name"
      2) "ccc"

7、xread 消息列表

使用 xread  以阻塞或非阻塞方式获取消息列表。

语法格式: XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] 

  • count :数量

  • milliseconds / block :阻塞毫秒数,没有设置就是非阻塞模式,取到一个结果就退出。

  • key :队列名

  • id :消息 ID

# 创建两个流
127.0.0.1:6379> xadd demo1 * name aaa
"1619580212875-0"
127.0.0.1:6379> xadd demo1 * name bbb
"1619580215254-0"
127.0.0.1:6379> xadd demo2 * name ccc
"1619580226876-0"
127.0.0.1:6379> xadd demo2 * name ddd
"1619580230966-0"

# 读取所有
# 0-0 从头消费
127.0.0.1:6379> xread streams demo1 0-0
1) 1) "demo1"
   2) 1) 1) "1619580212875-0"
         2) 1) "name"
            2) "aaa"
      2) 1) "1619580215254-0"
         2) 1) "name"
            2) "bbb"

# 读取1个头部消息
127.0.0.1:6379> xread count 1 streams demo1 0-0
1) 1) "demo1"
   2) 1) 1) "1619580212875-0"
         2) 1) "name"
            2) "aaa"

# 读取两个key的头部消息
127.0.0.1:6379> xread count 1 streams demo1 demo2 0-0 0-0
1) 1) "demo1"
   2) 1) 1) "1619580212875-0"
         2) 1) "name"
            2) "aaa"
2) 1) "demo2"
   2) 1) 1) "1619580226876-0"
         2) 1) "name"
            2) "ccc"

# 阻塞30秒监听demo3消息,有一个结果就退出
# $ 表示最新消息
127.0.0.1:6379> xread block 30000 streams demo3 $
1) 1) "demo3"
   2) 1) 1) "1619580596131-0"
         2) 1) "name"
            2) "ttt"
(1.84s)
127.0.0.1:6379>

命令最后的“ $ ”符号表示读取最新的消息,同时,我们设置了 block 30000 的配置项,单位是毫秒,表明在读取最新消息时,如果没有消息到来,将阻塞30000 毫秒(即 30 秒),然后再返回。

8、xgroup create 创建消费者组

Streams 本身可以使用 xgroup 创建消费组,创建消费组之后,Streams 可以使用 xreadgroup 命令让消费组内的消费者读取消息。


使用 xgroup create 创建消费者组。

语法格式: XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername] 

  • key:队列名称,如果不存在就创建

  • groupname:组名。

  • $:从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

#从头开始消费
xgroup create mystream consumer-group-name 0-0  

#从尾部开始消费
xgroup create mystream consumer-group-name $

示例

127.0.0.1:6379> xadd mydemo * name aaa
"1619581865830-0"
127.0.0.1:6379> xadd mydemo * name bbb
"1619581868367-0"
127.0.0.1:6379> xadd mydemo * name ccc
"1619581870373-0"

# 创建 group1 消费组,消费 mydemo 队列消息
127.0.0.1:6379> xgroup create mydemo group1 0
OK

# 让 group1 消费组里的消费者 consumer1 从 mydemo 中读取所有消息
127.0.0.1:6379> xreadgroup group group1 consumer1 streams mydemo >
1) 1) "mydemo"
   2) 1) 1) "1619581865830-0"
         2) 1) "name"
            2) "aaa"
      2) 1) "1619581868367-0"
         2) 1) "name"
            2) "bbb"
      3) 1) "1619581870373-0"
         2) 1) "name"
            2) "ccc"

命令最后的参数“ > ”,表示从第一条尚未被消费的消息开始读取。因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以 consumer1 就得到 mydemo 消息队列中的所有消息了(消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了)。

9、xreadgroup group 读取消费者组消息

使用 xreadgroup group  读取消费组中的消息。

语法格式: XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] 

  • group:消费组名

  • consumer:消费者名

  • count:读取数量

  • milliseconds / block:阻塞毫秒数

  • key:队列名

  • ID:消息 ID

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"

XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"

XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"

为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 xack  命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 xack 命令,消息仍然会留存。此时,消费者可以在重启后,用 xpending  命令查看已读取、但尚未确认处理完成的消息。

10、xpending 待处理消息信息

xpending 命令是检查待处理消息列表的接口,用于观察和了解消费者组正在发生的事情:哪些客户端是活跃的,哪些消息在等待消费,或者查看是否有空闲的消息。

此命令与 xclaim 一起使用,用于实现长时间故障的消费者的恢复。

语法格式: xpendign key groupName [start end count] [consumerName] 

xpending是一个只读的命令,会输出指定group中所有处于pending message中的消息总个数、开始ID、结束ID、每个consumer中pending message中消息的个数。可以通过指定开始结束id和consumerName来获取更加详细的信息

127.0.0.1:6379> xpending mydemo group1
1) (integer) 3
2) "1619581865830-0"  # 消息开始ID
3) "1619581870373-0"  # 消息结束ID
4) 1) 1) "consumer1"  # 消费者
      2) "3"                  # 消费数量

# 限制消息数量
127.0.0.1:6379> xpending mydemo group1 - + 3
1) 1) "1619581865830-0"       # 消息id
   2) "consumer1"                 # 消费者
   3) (integer) 12258065    # 自上次将此消息传递给该消费者以来,经过的毫秒数
   4) (integer) 1                   # 读取的次数
2) 1) "1619581868367-0"
   2) "consumer1"
   3) (integer) 12258065
   4) (integer) 1
3) 1) "1619581870373-0"
   2) "consumer1"
   3) (integer) 12258065
   4) (integer) 1

# 限制指定消费者消息数量
127.0.0.1:6379> xpending mydemo group1 - + 2 consumer1
1) 1) "1619581865830-0"
   2) "consumer1"
   3) (integer) 12352110
   4) (integer) 1
2) 1) "1619581868367-0"
   2) "consumer1"
   3) (integer) 12352110
   4) (integer) 1

11、xack 消息标记

消息标记确认

使用 xack  命令通知 Streams,然后这条消息就会被删除。当我们再使用 xpending  命令查看时,就可以看到,consumer1 已经没有已读取、但尚未确认处理的消息了。

# 让 group1 消费组里的消费者 consumer1 从 mydemo 中读取所有消息
127.0.0.1:6379> xreadgroup group group1 consumer1 streams mydemo >
1) 1) "mydemo"
   2) 1) 1) "1619581865830-0"
         2) 1) "name"
            2) "aaa"
      2) 1) "1619581868367-0"
         2) 1) "name"
            2) "bbb"
      3) 1) "1619581870373-0"
         2) 1) "name"
            2) "ccc"

# ack 确认
127.0.0.1:6379> xack mydemo group1 1619581868367-0
(integer) 1
127.0.0.1:6379> xack mydemo group1 1619581865830-0
(integer) 1

127.0.0.1:6379> xpending mydemo group1
1) (integer) 1
2) "1619581870373-0"
3) "1619581870373-0"
4) 1) 1) "consumer1"
      2) "1"

12、xgroup setid 消费者组设置新的最后递送消息ID

使用 setid  子命令设置要传递的下一条消息。 通常情况,在消费者创建时设置下一个ID,作为 xgroup create 最后一个参数。 但是使用这种形式,可以在以后修改下一个ID,而无需再次删除和创建使用者组。

例如,如果你希望消费者组中的消费者重新处理流中的所有消息,你可能希望将其下一个ID设置为0。

13、xgroup delconsumer 删除消费者

从消费者组中移除给定的消费者,使用以下命令格式

127.0.0.1:6379> xgroup delconsumer mydemo group1 consumer1
(integer) 1

14、xgroup destroy 删除消费者组

即使存在活动的消费者和待处理消息,消费者组也将被销毁,因此请确保仅在真正需要时才调用此命令。

127.0.0.1:6379> xgroup destroy mydemo group1
(integer) 1

15、xclaim 转移消息归属权

语法格式: xclaim key groupName comsumer minIdleTime id [id ...] [IDLE ms] [TIME msUnixTime] [RETRYCOUNT count] [force] [justid] 

16、xinfo 流和消费者组信息

检索关于流和关联的消费者组的不同的信息。

语法格式: XINFO STREAM <key> 

127.0.0.1:6379> xinfo stream mydemo
 1) "length"  
2) (integer) 3          # 流元素的数量
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
8) "1619581870373-0"  # 最后消息ID
 9) "groups"
10) (integer) 1             # 消费者组数量
11) "first-entry"             # 第一个消息
12) 1) "1619581865830-0"
    2) 1) "name"
       2) "aaa"
13) "last-entry"              # 最后一个消息
14) 1) "1619581870373-0"
    2) 1) "name"
       2) "ccc"

17、xinfo groups 消费者组的信息

只获得与流关联的所有消费者组的输出

127.0.0.1:6379> xinfo groups mydemo
1) 1) "name"
   2) "group1"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 3
   7) "last-delivered-id"
   8) "1619581870373-0"

18、xinfo stream 流信息

127.0.0.1:6379> xinfo consumers mydemo group1
1) 1) "name"
   2) "consumer1"
   3) "pending"
   4) (integer) 3
   5) "idle"
   6) (integer) 417812


未经允许请勿转载:程序喵 » Redis Stream 使用

点  赞 (4) 打  赏
分享到: