在上一节中,我们详细分析了 IO 线程的一些内容以及 readQueryFromClient() 函数的逻辑,这些都是我们理解 Redis 多线程模式下读取客户端请求核心所在。

在 readQueryFromClient() 函数中读取到 client->querybuf 缓冲区的都是一个个的字节,Redis Server 接下来要做的就是,把这个 byte 数组中的内容,按照一定的规则,解析成 Redis Server 能够理解的命令。这部分逻辑就是在 readQueryFromClient() 函数最后调用的 processInputBuffer() 函数中完成的。

RESP 协议基础知识

不过,在开始 processInputBuffer() 函数的介绍之前,我们需要先说一些 Redis 命令解析的基础知识。

第一个基础知识点是 Redis 客户端的请求类型,对应的是 client->reqtype 字段,它有两个可选值 PROTO_REQ_INLINE、PROTO_REQ_MULTIBULK。其中,INLINE 是内联请求类型,一般是 Telnet 这种客户端发出来的请求,会使用 INLINE 类型的请求;MULTIBULK 是协议请求类型,我们用的 redis-cli 客户端、Lettuce 客户端发送的都是 MULTIBULK 类型的请求。

第二个知识点是 RESP 协议。RESP 协议是客户端与 Redis Server 进行交互的基础协议,小伙伴们可以把它理解成客户端和 Redis Server 沟通的一种语言,比如汉语、英语,只有两边都说同一种语言,才能正常交互,RESP 协议现在有 v2 和 v3 两个版本。这两个版本的 RESP 协议的完整描述参考下面这两个链接。

对大多数小伙伴们来说,通读这两个版本的 RESP 协议,可能是一件非常枯燥、无趣且耗时的事情。为了减轻小伙伴们的痛苦呢,下面我们就结合几个示例,一起来分析一下 RESP 2 和 RESP 3 里面常见的一些内容。

无论是 RESP 2 还是 RESP 3 里面,客户端都是以字符串数组的形式把命令以及命令参数等信息发到 Redis Server,大概的格式如下:

1
2
3
4
5
6
*<number of arguments> \r\n
$<number of bytes of argument 1> \r\n
<argument data> \r\n
...
$<number of bytes of argument N> \r\n
<argument data> \r\n

在 Redis 客户端发送“SET testKey testValue” 这条请求的时候,实际上发送的是:*3\r\n$3\r\nSET\r\n$7\r\ntestKey\r\n$9\r\ntestValue\r\n

其中,* 表示一个 Array(数组)的开头,Array 是 RESP 中定义的一种类型,* 后面需要紧跟数组的长度,然后后面再跟数组的具体元素,每个元素都可以是下面四种类型的一种。

  • Simple String 表示的是一个非二进制安全字符串,里面不能携带 \r\n 这些字符,所以说是非二进制安全的。Simple String 使用 “+” 开头,后面紧跟具体的字符串内容。

  • Error 表示的是一个非二进制安全的错误信息。其实,Error 和 Simple String 差不多,唯一的区别就是:Error 是以 “-” 这个字符开头的。

  • Integer 表示的是一个整数,它的第一个字符是“:”,后面紧跟具体的整数值。

  • Bulk String 表示的是一个二进制安全的字符串,它由两行构成,第一行以“$”字符开头,后面紧跟字符串长度,然后 \r\n 结束;第二行就是具体的字符串内容,然后也是以 \r\n 结束。

经过上面的介绍,我们就大概知道“SET testKey testValue”这条请求的换分方式了,如下图所示:

image.png

上面这些数据类型,在 RESP 2 中其实就已经支持了,在 RESP 3 中也是兼容的。在 Redis 6 中为了支持客户端缓存,也为了让 RESP 的语义更加丰富,引入了 RESP 3 协议。在 Redis 6 中,redis-cli 客户端默认还是 RESP 2 协议,我们可以使用 HELLO 命令查看当前客户端使用的 RESP 协议版本:

1
2
3
4
5
6
7
8
127.0.0.1:6379> hello
1) "server"
2) "redis"
3) "version"
4) "7.0.0"
5) "proto"
6) (integer) 2 // 这里就是当前使用的RESP版本
... // 省略后续输出

我们可以执行 HELLO 3 命令将当前客户端切换到 RESP 3 协议,如下,不仅返回的 proto 值变了,整个输出格式也都变了:

1
2
3
4
5
127.0.0.1:6379> hello 3
1# "server" => "redis"
2# "version" => "7.0.0"
3# "proto" => (integer) 3 // 这里就是当前使用的RESP版本
... // 省略后续输出

RESP 3 协议不仅兼容了 RESP 2 中的数据类型,还新增了十多种的数据类型。这里结合几个例子来介绍一下 RESP 3 中的新类型,比如,在 RESP 3 中引入了 Map 这种新类型,它的格式如下:

1
2
3
4
5
*<number of key-value> \r\n
<key-type><key> \r\n
<value-type><value> \r\n
<key-type><key> \r\n
<value-type><value> \r\n

假设我们在 Redis 里面存储了一个叫 testMap 的哈希表结构,用 JSON 表示其具体内容的话,是下面这样一段 JSON:

1
2
3
4
{
"name":"kouzhao",
"age":2
}

我们用 HGETALL 命令查询 testMap 中全部键值对的时候,会返回下图展示的结构。其中,“%” 表示一个 Map 类型的结构,后面紧跟键值对的个数,然后依次是各组键值对,每个 Key 和 Value 都是 Bulk String 类型的值。

image.png

再比如说,我们在 Redis 里面存储了一个叫 testZset 的有序集合,用 JSON 表示其具体内容的话,是下面这段 JSON:

1
2
3
4
5
6
7
8
9
10
[
[
"first" ,
1
],
[
"second" ,
2
]
]

我们用 ZRANGE testZSet 0 -1 WITHSCORES 命令查询 testZset 中全部元素以及 score 值的时候,会返回下图展示的结构。其中需要注意的是,Double 类型使用 “,” 开头。

image.png

RESP 3 中除了引入 Map、Double 这两个新类型之外,还引入了 Set、Attribute、Push、NULL、Stream String,等等。这里我们就不再一一展开介绍了,想要深入了解 RESP 3 协议中所有新类型的小伙伴,可以参考这篇文档

命令解析

介绍完 Redis 命令解析的前置基础之后,我们就可以开始详细讲解命令解析的逻辑了。

正如前文所述,processInputBuffer() 函数是命令解析和命令执行的入口,其中会通过一个 while 循环不停地解析命令,直到把 client->querybuf 缓冲区中所有的命令处理完,下面是其核心流程图:

image.png

下面是 processInputBuffer() 函数的核心代码和注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void processInputBuffer(client *c) {
// qb_pos字段用来记录querybuf的读取位置
while(c->qb_pos < sdslen(c->querybuf)) {
... // 忽略其他异常处理逻辑
if (!c->reqtype) {
// reqtype字段指定了该客户端发出的请求协议类型
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
// 不同协议类型走不同的命令解析函数
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
... // 省略非核心逻辑
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else { // 未知请求类型,输出日志并结束进程 }

// 在IO线程读取请求的时候,io_threads_op这个全局变量的值是IO_THREADS_OP_READ,
// 所以正常解析请求的时候,一定会走进下面的分支。里面会给client添加
// CLIENT_PENDING_COMMAND标志位,表示该client中有一条待执行的命令,
// 同时,里面的break会结束当前这条命令的解析过程
if (io_threads_op != IO_THREADS_OP_IDLE) {
serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
// 因为上面CLIENT_PENDING_READ标记为的处理,IO线程中不会执行到这里
if (processCommandAndResetClient(c) == C_ERR) { return; }
}
... // 省略了针对主从复制对querybuf复用的一些优化逻辑,这个优化点在主从复制的小节里面再说
if (c->qb_pos) { // 将已经解析的命令从querybuf缓冲区中删除
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0; // 重置qb_pos
}
}

我们在实际生产中,使用最多的还是 redis-cli 以及 Lettuce 这类客户端,所以这里我们重点关注 MULTIBULK 请求的解析流程,也就是 processMultibulkBuffer() 函数

首先,processMultibulkBuffer() 会读取请求中第一行,确定数组中有多少个,相应的代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// multibulklen字段用来记录此次multibulk请求中剩余要读取的参数个数,
// 此时是0,表示还未初始化,我们要读取第一行数据
if (c->multibulklen == 0) {
// 在querybuf缓冲区中搜索'\r'这个分隔字符
newline = strchr(c->querybuf+c->qb_pos,'\r');
... // 省略异常处理
// 将第一行数据转换成整数,并记录到multibulklen这个字段中
ok = string2ll(c->querybuf+1+c->qb_pos,
newline-(c->querybuf+1+c->qb_pos),&ll);
c->multibulklen = ll;
c->qb_pos = (newline-c->querybuf)+2; // 后移qb_pos值
// argv字段用来记录解析后的参数
c->argv_len = min(c->multibulklen, 1024);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
c->argv_len_sum = 0; // argv_len_sum字段用来记录请求参数解析后的总长度
}

确定元素个数之后, processMultibulkBuffer() 会开始逐个解析数组中的元素。根据 RESP 协议,请求中每个数组元素都是 Bulk String 类型,这里会一个个数组元素进行解析。我们以第一个元素的解析为例:

  • 首先是读取第一个元素的第一行,确定它是以 “$” 字符开头的,然后通过这行的数字,也就确定了这个字符串的具体长度,该长度值会记录到 client->bulklen 字段中;

  • 然后,根据字符串长度,读取第二行,拿到字符串的具体内容,并对请求进行解析。

下面是 processMultibulkBuffer() 函数解析请求的核心代码片段,其中删除了很多不重要的分支,只保留了最关键的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
while(c->multibulklen) {
// bulklen字段记录当前bulk的长度,为-1时表示未初始化,
// 需要我们读取当前bulk的第一行进行初始化
if (c->bulklen == -1) {
// 从qb_pos位置开始,查找querybuf缓冲区中的第一个'\r'分隔符。
newline = strchr(c->querybuf+c->qb_pos,'\r');
if (c->querybuf[c->qb_pos] != '$') {...} // 如果不是以"$"开头,直接抛异常
// 读取这一行中的数字,也就是该元素的字符串的长度
ok = string2ll(c->querybuf+c->qb_pos+1,
newline-(c->querybuf+c->qb_pos+1),&ll);
c->bulklen = ll; // 字符串长度记录到client->bulklen字段中
}

// 下面开始读取字符串的具体内容
if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
// querybuf缓冲区中数据不足以构造当前元素,
// 则停止读取,等待连接下次可读事件
break;
} else {
// 解析字符串的具体内容,得到对应的robj对象,并记录到argv数组中。
// 这里使用的argc字段用来记录请求中元素个数
c->argv[c->argc++] =
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
c->argv_len_sum += c->bulklen; // 参数长度增加
c->qb_pos += c->bulklen+2; // 后移qb_pos
c->bulklen = -1; // 当前字符串读取完毕,重置bulklen
c->multibulklen--; // 读完一个元素,multibulklen值递减1
}
}

分析完 processMultibulkBuffer() 解析命令的逻辑之后,我们回到 processInputBuffer() 函数主流程继续往下看,这里循环调用 processMultibulkBuffer() 函数的 while 循环末尾,会有这么一段代码:

1
2
3
4
5
6
7
if (io_threads_op != IO_THREADS_OP_IDLE) {
serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
// 因为上面CLIENT_PENDING_READ标记为的处理,IO线程中不会执行到这里
if (processCommandAndResetClient(c) == C_ERR) { return; }

在 IO 线程读取请求的时候,io_threads_op 这个全局变量被设置成了 IO_THREADS_OP_READ,所以 IO 线程能够正常解析请求、不抛异常的时候,一定会走进这个 if 分支里面,这里面就是给 client 添加 CLIENT_PENDING_COMMAND 标志位,它是用来说明这个 client 实例里面已经有解析好的命令,等待主线程进行处理。关键就在这个 break,会直接跳出当前的这个 while 循环,结束当前这条命令的解析过程。

小伙伴们可以 Debug 一下代码,会发现随着这个 while 循环的退出,此次 processInputBuffer()、readQueryFromClient() 函数调用也都会结束,其实这也就是结束了 IO 线程对当前这个 client 上可读事件的处理。

命令执行

分析完命令解析的核心逻辑之后,我们回到 handleClientsWithPendingReadsUsingThreads() 函数,随着 IO 线程以及主线程处理完所有可读的 client 之后,主线程就不再阻塞等待,继续执行下面的逻辑来执行命令。

主线程从阻塞中恢复的第一件事情,就是把 io_threads_op 这个全局变量改成 IO_THREADS_OP_IDLE 状态,表示 IO 线程全部空闲了。然后,主线程开始进入一个 while 循环,从 server.clients_pending_read 队列的队头开始弹出 client 实例,每弹出一个 client 元素,就调用一次 processPendingCommandAndInputBuffer() 函数,执行这个 client 里面解析好的命令。

processCommandAndResetClient() 函数底层调用了 processCommand() 和 commandProcessed() 函数,调用栈如下图所示,其中 processCommand() 函数是命令执行的核心,commandProcessed() 函数是命令执行后的善后处理

先来看 processCommand() 函数的核心逻辑。

  1. client->argv[0] 中维护了当前命令的名称,所以我们要做的第一件事就是确定当前处理的是哪条命令。这里会通过 lookupCommand() 函数进行查找,它底层会查找 server.commands 这个命令字典,获取对应的 redisCommand 实例。client->cmd 字段会记录当前正在执行这个 redisCommand 实例。
  1. 接下来,对 client->cmd 进行多项检查检查,如下。

    • 检查 client->cmd 字段是否为空。
    • 检查命令与命令参数是否一致。
    • 检查客户端权限。
    • 检查当前的 Redis Server 是否达到内存上限,达到了之后,就不能继续写入数据了。
    • 如果是 Cluster 模式下运行,会检查命令操作的 key 是否位于当前 Redis 实例上,如果不是,会返回给 Redis 客户端重定向的响应。
    • 如果是在主从模式下运行,还会检查主从复制状态是否正常,如果不正常,就无法写入数据。
    • 还有很多检查,这些检查各有各的目的,这里就不一一列举了。总之,检查不通过时,直接通过 rejectCommandFormat() 函数给客户端返回错误信息。
  2. 通过上述检查之后,我们就可以开始执行命令了,这里分为两个分支。

    • 如果客户端在一个事务上下文中,那么当前命令(特殊命令除外)会入队等待,直至后续有 EXEC 命令到达时,才会将整个队列中的命令一起执行。
    • 要是不在一个事务上下文里面,就会直接调用 call() 函数执行命令。如果当前命令操作了某个客户端阻塞等待的 key,该 key 会添加到 server.ready_keys 列表中,这里会对 ready_keys 进行检查,并调用 handleClientsBlockedOnKeys() 函数唤醒阻塞的客户端。关于阻塞命令的逻辑,我们后面会专门介绍。

下面是 processCommand() 函数触发命令执行的核心代码片段:

1
2
3
4
5
6
7
8
9
10
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand ... // 省略其他不能入队等待执行的命令 ) {
queueMultiCommand(c); // 将当前命令入队,等待后续执行
addReply(c, shared.queued); // 给客户端返回"+QUEUED"字符串
} else {
call(c, CMD_CALL_FULL); // 调用call()函数执行命令
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys(); // 唤醒阻塞的客户端
}

在 call() 函数中最核心的逻辑就是调用 client->cmd->proc() 函数,来真正执行命令的处理逻辑,具体执行什么逻辑,就要看具体执行的命令是什么了。比如我们执行 SET 命令,对应的 proc 函数指针指向的就是 setCommand() 函数,其实,我们在 Redis 源码里面看到很多“命令名称 + Command” 结尾的函数,这些都是相应命令的处理逻辑。

除了调用命令的处理逻辑之外,call() 函数中还有很多辅助逻辑和统计操作。

  • 统计命令执行的时间,如果超过 server.slowlog_log_slower_than 指定的慢查询阈值,会被记录到慢查询日志中。

  • 更新命令对应 redisCommand 实例的各个统计信息。比如,我们执行一条 SET 命令,我们就将所有 SET 命令执行的总耗时(microseconds)、执行的次数(calls)等信息进行累加。同时还会更新 server 相关的统计信息。

  • 根据当前 Redis 实例、命令以及各个客户端的状态,做一些额外的操作。例如,如果当前 Redis 实例是主从模式中的主库或是需要写入 AOF 日志,就需要将带有修改属性的命令传播到从节点或是写入 AOF 文件。

这些额外的操作和统计这里就不再一一展开分析了,在后面介绍别的主题的时候,还会展开介绍。小伙伴在这里只要知道 call() 函数是真正调用 client->cmd-proc() 函数执行命令的地方即可。

总结

在这一节中,我们首先介绍了 RESP 2 和 RESP 3 的基础知识,然后介绍了 IO 多线程模式下,Redis 命令解析和命令执行的核心流程。

通过上一节以及本节的学习,你应该可以大致梳理出 Redis 从接收到客户端请求一直到命令执行的整个流程:主线程将发生可读事件的 client 按照 Round Robbin 的方式,分配给多个 IO 线程进行请求读取和命令解析,解析好的命令会放到 client->argv 这个数组中;等到全部命令解析完之后,主线程才会执行命令,执行命令的核心就是查找 redisCommand 字典,然后调用 client->cmd->proc() 函数执行相应的命令逻辑。

在下一节中,我们将开始讲解 Redis 命令执行完成之后的响应是如何返回的。至于 Redis 核心命令的实现逻辑,将在后面的“模块四:命令解析篇” 中进行介绍。