Redis stream 教程

Last Modified: 2022/11/13

概述

Redis stream 最早出现于 Redis 5.0,模仿了日志数据结构来建模,但是却克服了日志文件的短板,能日志所不能,例如:

  • 支持条目的随机访问,且时间复杂度为 O(1)
  • 支持复杂的消费策略,例如:消费者组(consumer groups)

stream 基础

向 stream 中追加数据:

XADD mystream * sensor-id 1234 temperature 19.8

xadd 是追加命令, mystream 是流的 Key,‘*’ 是 id,id 可以手动指定值,也可以是 * 表示让 redis 自动生成。如果使用 *,redis 节点会自动生成自增的 id。

id 后面的是一个或者多个键值对,键值对一条记录的具体内容。

stream 中 id 由两部分组成,的格式为:

<millisecondsTime>-<sequenceNumber>

其中 <millisecondsTime> 是精确到毫秒为的时间戳,<sequenceNumber> 则是自增的数字,这保证了高并发情况下,id 始终单调递增且唯一。

注:id 可以自动生成,也可手动指定,但是手动指定的时候,需要注意 id 的自增性,也就是当前 id 必须比上一个大,否则命令会报错。

从 stream 中获取数据

获取数据支持三种方式:

  • 将 stream 看成是一个时间序列的 store,使用 xrange/XREVRANGE 获取指定范围的数据;
  • 将 stream 看成是一个日志文件。我们可以使用 tail -f 来监听追加到日志文件的新数据行,同理我们可以使用 xread 来“监听”追加到 stream 中的新条目;
  • 组消费。这种模式适合消息生产快消费慢的情况,多个消费者同时消费同一个 stream 中的不同的条目,已达到负载均衡的目的。

注:第 2 条中的监听并不是真的不间断的监听新的元素,xread 命令都是一次性的,也就是说执行一次,获取一次数据,只不过 xread 支持阻塞模式,可以阻塞到新元素到达才返回。

范围查询

# 获取指定范围内的数据
XRANGE mystream 1518951480106 1518951480107
# - 表示 stream 中最小id,+ 表示最大id,count 表示要获取多少个数据
XRANGE mystream - + COUNT 2
# 可以指定明确指定一个开始 id,结束 id可以用 + 代替
XRANGE mystream (1519073279157-0 + COUNT 2

xrange 的查询结果是数组,每个元素都是 stream 中一个条目,每个条目本身也是数组,第一个元素是 id,第二个元素则是条目键值对组成的数组。

> XRANGE mystream (1519073279157-0 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

xread 监听 stream

xread 命令的格式如下,其中 count 和 block 参数都是可选的:

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

count 参数指定获取条数, block 用于指定是否使用阻塞方式获取, key 是要读取的目标 stream 的名称, ID 可以看成是 offset,即从什么位置获取。

# 从id 0开始,获取 2 条数据, count 只表示最大条数,如果 stream 仅有1条,那么只会返回一条数据
xread count 2 streams mystream 0
# 可以同时从多个 stream 中获取数据,例如下面的命令同时从 mystream1/mystream2 中获取数据,都是从 id 为 0 开始获取 
xread streams mystream1 mystream2 0 0
# 还可以以阻塞的方式获取,使用 block 参数即可,block 后面的参数 0 表示阻塞时间不限,无限阻塞直到获取到数据为止,也可以指定一个具体的数值表示阻塞指定的毫秒数
xread block 0 streams mystream 0
# 这里 id 被指定为 $,表示从当前时间开始,获取最新的数据
xread block 0 streams mystream $

组消费

可将一个消费者组看做一个伪消费者(pseudo consumer),该消费者从 stream 中获取数据并服务于多个实际的消费者。消费者组提供了以下保证:

  • 每个消息只会提供给其中一个消费者,不会出现一个消息同时被多个消费者获取的情况;
  • 消费者组中的每个消费者都由一个唯一的名称标识,这个名称是我们客户端在消费消息的时候自行指定的,同一个组中的消费者保证名称唯一即可。
  • 每个消费者组都有 “first ID” 的概念,这个是用来记录群组最新的尚未被递送的 id。这样的话,如果群组中有新消费者请求消息,就可以知道从 stream 的哪个位置获取消费提供给消费者;
  • 消费者消费消息完成后,需要通过 ack 命令通知群组,这样群组知道消息已经被成功处理,因此可以从群组中将该消息移除;
  • 消费者组会追踪所有“挂起”的消息,所谓挂起的消息就是那些已经递送给消费者,但是没有收到 ACK 的消息。

从某种意义来说,一个消费者组可以看做是一个 stream 的某种状态,看下图:

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

从上图可以很清晰的看出一个消费者群组的组成部分:

  • 组的名称,由 consumer_group_name 标记;
  • stream 的名称,即组是归属于哪个 stream的,由 consumer_group_stream 标识;
  • 消费者组上次递送的 id,由 last_delivered_id 标识;
  • 一组消费者,记录每个消费者各自“挂起”的消息,也就说递送给消费者,但是消费者尚未确认的消息。

1、 创建群组,注意,创建群组的时候需要指定一个 id

# 创建一个群组名为 mygroup, id 指定为 0 表示该群组会从头开始消费 stream
XGROUP CREATE mystream mygroup 0
# 创建一个群组名为 mygroup, id 指定为 $, 表示该群组从当前时间开始消费 stream 产生的新消息
XGROUP CREATE mystream mygroup $
# 创建一个群组名为 mygroup,多个一个 MKSTREAM 关键字,表示创建群组的同时创建 stream
XGROUP CREATE newstream mygroup $ MKSTREAM

2、 消费者消费消息

消费消息的命令格式如下:

XREADGROUP GROUP groupName consumerName STREAMS streamName MSG_ID

其中需要特别说明的是 msg_id 分几种情况:

  • 使用 ‘>’ 作为 id,此时将会返回一个尚未递送到其他消费者的新消息,读的同时还会更新消费组的 last_delivered_id;
  • 如果是任何一个具体的数值 id,会返回递送给该消费者且未回复 ACK 的历史挂起消息(history of pending message);
  • 使用 ‘$’,这个表示 stream 中最大的 id,XREADGROUP 如果使用这个就表示历史条目我都不关心了,我只关心从此刻之后的新信息。注意他和‘>’的区别

第二点很有意思,你指定的 id 是具体数值,那么只会返回历史挂起消息,而不是你指定 id 的那条消息。如果一个消费者因某种原因 G 了,那么当他重启的时候,可以先使用数值 id 来读取历史挂起消息,读完历史挂起消息之后,再使用 ‘>’ 消费组中的新消息。

# mygroup 中的消费者 consumer1 请求消费名为 mystream 的 stream, 由于 id 为具体数值,因此返回该消费者的历史挂起消息
XREADGROUP GROUP mygroup consumer1 STREAMS mystream 0
# 由于 id 为 >,因此返回2条尚未递送到其他消费者的新消息
XREADGROUP GROUP mygroup consumer1 COUNT 2 STREAMS mystream >

这里有几点需要值得注意:

  • 消费者不需要明确创建,使用 XREADGROUP 读消息的同时,消费者自动创建;
  • XREADGROUP 可以同时读取多个流,前提是你需要再每个流中都创建名称相同的消费者组;
  • XREADGROUP 命名看起是读,但是读的同时会更新组的 last_delivered_id。

3、 确认消息处理完毕

# 确认消息的命令格式
 XACK streamName groupName MSG_ID
# 来个例子
XACK mystream mygroup 1526569495631-0

永久故障恢复

当由于某种原因,一个消费者永远的 GG 了,那么该怎么处理该消费者的历史挂起消息呢?因此 redis stream 提供了故障恢复的功能,具体来说分为以下两步:

  • 使用 XPENDING 命令查询某个具体消费者或者全部的历史挂起消息;
  • 使用 XCLAIM 转移消息的所有者,即将某个消费者的挂起消息分配给其他消费者;

查询挂起消息的格式如下,值得注意的 min-idle-time 参数,用于查询挂起时长大于指定时间的消息,个人认为这个参数很有用,例如挂起时长大于1小时的消息很可能是由于消费者挂了或则和消息本身有问题无法消费;

# xpending 格式:
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
# xpending 返回的格式是数组,每个元素仍然是一个数组,此数组中元素的含义分别是:消息id、消费者的名字、挂起时长和分发次数
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "SomeConsumer1"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 2

分发次数是用来记录该消息被分发的次数,显然如果一个消息被分发了 N 多次,仍然处于挂起状态,显然该消息本身可能有问题,有可能该消息格式有错误,导致该消息永远也不能处理成功。此类消息类似于 mq 中的“死信”,我们可能需要将他们转移到一个新的队列中,并发消息通知系统管理员做特殊处理。

分发次数在以下两种情况下会增加:

  • 调用 xclaim 成功后,分发次数加 1;
  • 调用 XREADGROUP 读取历史挂起消息。

xclaim 用于分配挂起消息给新的消费者,格式如下:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

如果两个客户端同时调用 xclaim 分配同一条消息给同一个消费者,只会有一个成功,举例如下:

Client 1: XCLAIM mystream mygroup consumer1 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup consumer2 3600000 1526569498055-0

client1 调用 xclaim 分配了 id 为 1526569498055-0 的消息之后,这条消息的空闲时间会被置为 0, client2 在分配的时候由于指定了空闲时间为 3600000,所以自然不会成功。

xclaim 如果重新分配成功,返回结果如下:

> XCLAIM mystream mygroup consumer 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

自动分配

通过 xpending 和 xclaim 自然可以完成挂起消息的重新分配,但是有点繁琐,因此 stream 提供了自动分配的命令 xautoclaim。

这里翻译为自动分配(原文为 auto claiming),是站在命令调用者的角度来看的,我们调用自动 XAUTOCLAIM,其实就是将挂起消息分配给其他的消费者。

#命令格式,start为起始 id,如果指定了 JUSTID,那么命令将只返回挂起消息的id,而不返回完整的消息内容。
XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
> XAUTOCLAIM mystream mygroup consumer1 3600000 0-0 COUNT 1
1) 1526569498055-0
2) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

stream 内省

居然翻译的这么专业,我他喵的自己都惊了。其实就是查看 stream 中的各种内部信息,这显然很有必要,让我们能够了解 stream 的运行状态,例如:stream 中的消费者组的情况、每个组中的消费者情况等等,这些都归结到一个命令下 XINFO:

XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
#如果不记得可以使用 xinfo help 命令查看 xinfo 的用法
XINFO help
# 例如查看消费者组的情况
XINFO GROUPS mystream
# 进一步查看某个组中的消费者情况
 XINFO CONSUMERS mystream mygroup

和 kafka 中基于分区的消费群组的区别

stream 中的“分区”只是逻辑的,实际上所有的消息都是放到同一个 key 下面,哪个消费者请求消息就分发消息给哪个消费者(所以消息的消费也不是均衡的,取决于消费者消费消息速度的快慢),然而 kafka 中的基于分区的消费者群组则不同,分区和消费者是绑定的,消费者只会消费他绑定的分区的消息。一个群组中有有多个消费者,那么消费的消费是无序的。

限制 stream 的容量

由于 redis 是内存数据库,通常情况下,我们不可能让 stream 中的数据无限增加,我们得控制 stream 中数据的容量,redis 提供了以下集中方法。

1、 插入时指定最大容量,超过容量的条目会被自动移除,然而这种方法并不推荐,因为效率不可控;

XADD mystream MAXLEN 2 * value 1

2、 同样是插入时指定一个最小容量,例如下面命令中的 MAXLEN ~ 1000 表示至少保留 1000 个条目,但是不必是精确的 1000,可能是 1010 or 1030 等等,因此不是每次插入都会执行 trim,只有整个节点可以删除的时候,才会移除条目,这种效率就会高很多。

`XADD mystream MAXLEN ~ 1000 * ... entry fields here ...`

3、 利用 xtrim 来裁剪 stream

xtrim 支持两种模式,MAXLEN 和 minid

# 精确裁剪,只保留10个条目
XTRIM mystream MAXLEN 10
# 近似裁剪,至少保留10个条目
XTRIM mystream MAXLEN ~ 10
# id 小于 649085820 将会被移除
XTRIM mystream MINID 649085820

持久化、复制和消息安全

stream 和其他数据结构一样,异步复制到副本,并且会通过 aof 或者 rdb 方式持久化。值得一提的是 stream 的消费者组也会被复制到副本,并且持久化到文件。

关于消息安全你需要知道:

  • 如果持久化对于你的应用很重要,aof 的 fsync 选项需要打开;
  • 默认情况下异步复制不能保证 xadd 命令 和 消费群组的状态会被复制,当 failover 的时候,这些状态是否能成功复制,取决于副本能否从 master 接收数据;
  • 可使用 wait 命令强制变化同步到一组副本,这种情况,数据丢失的可能性极低,但是 sentinel 和 redis cluster 在 failover 的时候只是尽最大努力来选择一个最新的副本,但是者依然不能百分百保证副本不会丢失数据;

从 stream 中删除条目

> XDEL mystream 1526654999635-0
(integer) 1

长度为 0 的 stream

stream 的长度可以为 0, 无论是使用 maxlen、xtrim 还是 xdel 使得 stream 长度为0, 都不会删除 stream 本身,因为 stream 可能还有相关的消费组的状态,即便长度为0,我们也不想丢失这些状态。这点不同于其他数据结构,例如 zset,当我们将 zset 中的数据删完的时候,zset 本身也会随之删除。

阻塞消费的消费延迟

一句话就是延迟很低,可以简单认为消息几乎是实时“推送”的。

有问题吗?点此反馈!

温馨提示:反馈需要登录