在上一节中,我们详细介绍了 Redis 在 IO 多线程模式下,命令解析和命令执行的核心逻辑。小伙伴们可能会产生这样一个疑问:我们调用的 redisCommand->proc() 函数的时候,是没有返回值的,那命令执行产生的返回值是怎么返回给客户端的呢?

下面我们就来详细分析下这个问题。

数据返回

通过前面的介绍我们知道,Redis 在 IO 多线程模型下,命令产生的返回值是通过 IO 线程写回给客户端的,那既然 redisCommand->proc() 函数没有返回值,我们就会猜测 proc() 函数里面会把返回值写入到某个指定的地方,然后 IO 线程会去这个地方取该结果值,然后返回给客户端。

这里我们以 GET 命令为例进行分析,GET 命令对应的 proc 处理函数是 getGenericCommand() 函数,其核心逻辑如下:

1
2
3
4
5
6
7
8
9
int getGenericCommand(client *c) {
robj *o;
// 从Redis DB里面中查找value值
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
== NULL) return C_OK;
... // 省略类型检查
addReplyBulk(c,o); // 对返回值o进行编码并返回
return C_OK;
}

从名字就可以看出,addReplyBulk() 函数是向某个地方添加返回值,返回值的格式还是 Bulk String 的格式。按照这个思路,我们可以在 networking.c 文件中看到很多 addReply*() 函数,也都是写入返回值的入口。

这些 addReply*() 函数基本都是写入比较简单的、基础的数据,通过这些简单数据的组合,我们就可以写入一个复杂类型。

下面我们依旧以 GET 命令使用的 addReplyBulk() 函数为例,其底层就是一个 addReplyBulkLen() 函数和两个 addReply() 函数,调用栈如下图所示:

addReplyBulkLen() 函数的实现如下:

1
2
3
4
void addReplyBulkLen(client *c, robj *obj) {
size_t len = stringObjectLen(obj); // 计算字符串长度
addReplyLongLongWithPrefix(c,len,'$'); // 添加$前缀以及\r\n结尾
}

在 addReplyBulkLen() 函数中,会编码 Bulk String 结构的头部:首先计算字符串的长度,然后用 addReplyLongLongWithPrefix() 函数,在这个长度值前面添加 “$” 前缀,后面添加 “\r\n” 后缀,这就拼接好了 Bluk String 的第一行,最后就是调用 addReplyProto() 函数,把拼好的 Bulk String的第一行写入到 client->buf 缓存中,等待 IO 线程进行发送。

addReply() 函数也是类似的逻辑,它会将查询到的 robj 转换成字符串,然后追加到 client->buf 缓冲区中。

前面一直说 Redis 主线程会把命令的返回值写入到一个指定的位置,这个指定位置就是这里所说的 client 的这两个字段

  • 一个是 buf 字段。它作为缓冲区可以暂存返回给对应客户端的数据,使用 bufpos 字段记录 buf 缓冲区中最后一个有效字节的位置。

  • 另一个是 reply 字段。它是一个 adlist 链表。buf 缓冲区的长度是固定的 16 K,当返回结果超过 16K 时,Redis 开始向 reply 列表写入。reply 列表中每个节点都是一个 clientReplyBlock 实例(其中封装了 16 K 的缓冲区),只有在填满一个 clientReplyBlock 缓冲区之后,才会向 reply 列表中追加新的 clientReplyBlock 实例。

写入 buf 缓冲区以及 reply 列表的入口是 _addReplyToBufferOrList() 函数,它的调用栈如下:

它里面调用的 _addReplyToBuffer() 函数是向 buf 缓冲区里面写数据,_addReplyProtoToList() 函数则是往 reply 链表中写入数据。_addReplyToBufferOrList() 函数的核心片段如下:

1
2
3
4
5
6
7
void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
... // 省略其他逻辑
// _addReplyToBuffer()函数向buf缓冲区写入数据
size_t reply_len = _addReplyToBuffer(c,s,len);
// 如果buf缓冲区填满了,会通过_addReplyProtoToList()函数向reply链表写入
if (len > reply_len) _addReplyProtoToList(c,s+reply_len,len-reply_len);
}

_addReplyToBuffer 和 _addReplyProtoToList 两个函数的细节就不再展示了,感兴趣的小伙伴可以参考源码进行分析。

分配写回任务

弄清楚命令返回值写入到哪里以及如何写入之后,我们还有一点需要弄清楚:Redis 主线程是如何与 IO 线程交互,通知 IO 线程这个 client 有数据需要返回的呢?

这就涉及到 addReply*() 方法底层调用到的另一个函数 —— prepareClientToWrite(),调用 prepareClientToWrite() 函数的方法如下图所示:

prepareClientToWrite() 里面先是检查一下 client 里面标志位,确保这个 client 表示客户端能够接收返回值,比如,这个客户端马上就要关闭了,就无法向其返回数据了。

接下来就是 prepareClientToWrite() 函数的核心,这里通过 clientHasPendingReplies() 这个 if 判断,这是第一次往 buf 缓冲区里面写数据,此时才会走 putClientInPendingWriteQueue() 函数,把 client 添加到 server.clients_pending_write 队列中,后续就会在这些 client 的连接上监听可写事件。

1
2
3
4
5
6
int prepareClientToWrite(client *c) {
... // 省略对client的检查
// clientHasPendingReplies()是检查buf和reply是否已经数据,如果有,就返回true
if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
putClientInPendingWriteQueue(c);
}

另外,putClientInPendingWriteQueue() 函数还会在 client->flags 中设置 CLIENT_PENDING_WRITE 这个标志位,防止一个 client 被多次添加到 server.clients_pending_write 队列中。

弄明白 client->buf 缓冲区的填充逻辑以及主线程与 IO 线程的交互逻辑之后,我们继续主线程的执行逻辑。当主线程执行到 beforeSleep 函数时,里面会调用 handleClientsWithPendingWritesUsingThreads() 函数,将 server.clients_pending_write 队列中的 client,分配需要写回的 client 给 IO 线程进行处理。

我们来看一下 handleClientsWithPendingWritesUsingThreads() 函数的核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
handleClientsWithPendingWritesUsingThreads(void) {
// 步骤1、server.clients_pending_write 队列长度,要是这个队列是空,就没必要执行后续写回响应的流程了
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;

// 步骤2、检查server.io_threads_num以及server.io_threads_active字段,并激活IO线程
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
if (!server.io_threads_active) startThreadedIO();

listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) { // 步骤3、分配写回任务
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
listAddNodeTail(io_threads_list[0],c);
continue;
}
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

// 步骤4、设置io_threads_op全局标识为 IO_THREADS_OP_WRITE
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}

// 步骤5、主线程处理io_threads_list[0]这个队列中的client
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);

// 步骤6、等到主线程以及全部IO线程都处理完了自己的任务
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}

io_threads_op = IO_THREADS_OP_IDLE;

// 步骤7、重新逐个检查一下server.clients_pending_write队列中的client,
// 是不是还有数据要返回客户端。如果有的话,会调用 CT_Socket.set_write_handler 函数
// 将 sendReplyToClient() 函数设置为 connection-> write_handler 回调函数
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
...
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
}
}
listEmpty(server.clients_pending_write); // 清空server.clients_pending_write队列
server.stat_io_writes_processed += processed;

return processed;
}
  1. 首先是检查 server.clients_pending_write 队列长度,要是这个队列是空,就没必要执行后续写回响应的流程了。

  2. 然后是检查 server.io_threads_num 以及 server.io_threads_active 字段,如果开启了 IO 多线程的模式,但是 IO 线程还未激活,就在此处释放对应的锁,激活 IO 线程,这个前面提到过,不再多说。如果是没有开启 IO 多线程的模式,就会走 handleClientsWithPendingWrites() 这个分支,这里面会由主线程自己完成全部的写回响应的任务。

  3. 下面正式进入写回任务的分配。首先就是一个 while 循环 ,使用 Round-Robin 算法将 clients_pending_write 队列中待处理的 client,分配给每个 IO 线程,也就是分配到 io_threads_list 队列中。

  4. 接下来,设置 io_threads_op 全局标识为 IO_THREADS_OP_WRITE,用来告诉 IO 线程此次处理的是可写事件。还会设置每个 IO 线程要处理的 client 数量,也就是设置 io_threads_pending 数组对应的元素值,这就会让 IO 线程退出自旋等待,开始执行 writeToClient() 函数将 client->buf 以及 reply 队列中的响应发送给客户端。

  5. IO 线程并发处理自己负责的 io_threads_list 队列的时候,主线程也不会闲着,它会处理 io_threads_list[0] 这个队列中的 client,这里也就是通过 writeToClient() 函数发送数据。在主线程完成 io_threads_list[0] 列表的处理之后,会阻塞等待全部 IO 线程完成自己负责的写入任务。

  6. 等到主线程以及全部 IO 线程都处理完了自己的任务,主线程的阻塞结束。主线程会先将 io_threads_op 全局变量设置成 IO_THREADS_OP_IDLE,表示 IO 线程空闲了。

  7. 最后,再重新逐个检查一下 server.clients_pending_write 队列中的 client,是不是还有数据要返回客户端。如果有的话,会调用 CT_Socket.set_write_handler 函数将 sendReplyToClient() 函数设置为 connection-> write_handler 回调函数。

    这样的话,当相应连接之后变的可写的时候,主线程会直接调用 sendReplyToClient() 函数,它里面会调用 writeToClient() 函数,把 client->buf 以及 reply 队列中的剩余数据返回给客户端。

writeToClient() 函数

在完成写回任务的分配之后,无论是主线程还是各个 IO 线程,都会调用 writeToClient() 函数处理每个 client 中要返回给客户端的数据。下面我们就看看 writeToClient() 函数实现的细节。

writeToClient() 函数的核心是一个 while 循环,其中会不断调用 _writeToClient() 函数,往底层的 Scoket 连接里面写数据。小伙伴们可以先大致看一眼 writeToClient() 函数的核心代码框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int writeToClient(client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
while(clientHasPendingReplies(c)) { // 当前client是否有要返回的数据
int ret = _writeToClient(c, &nwritten); // 向底层Socket连接写入数据
if (ret == C_ERR) break;
totwritten += nwritten;
// 一次最多返回64k的数据,超过64k直接结束
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}

if (!clientHasPendingReplies(c)) {
// 当前client所有要返回的数据,都已经写入底层的Socket连接之后,会走入这个分支
c->sentlen = 0;
if (handler_installed) {
connSetWriteHandler(c->conn, NULL);
}
... // 省略其他逻辑
}
... // 省略一些统计操作和异常检查
return C_OK;
}

我们先来展开看看 _writeToClient() 函数的实现,它里面根据 reply 队列的情况分成了两个分支

第一个分支是 client->reply 队列里面有数据的时候才会走到,其中使用 writev 系统调用,把多个 buf 缓冲区以及多个 reply 节点的数据,通过一次 writev 系统调用写入到底层的 Socket 连接里面。在一次 writev 调用完成之后,_writevToClient() 函数把已经发送出的数据清理掉。例如,下面这张图的状态,图中发送之前,buf 缓冲区已经被填满了,reply 队列里面还有三个节点,writev 调用一次就把 buf 缓冲区以及 reply 队列中前两节点的数据都发送出去了,此时就会清空 buf 缓冲区,同时删除 reply 队列中的前两个节点。

image.png

如果小伙伴们去看 _writevToClient() 函数具体实现,里面会涉及到 client->sentlen 字段,这里说一下它的作用,它是用来记录当前缓冲区已经发送了多少个字节。比如说,下面这张图展示的两种情况,在发送 buf 缓冲区数据的时候,我们用 bufpos - sentlen 就可以得到 buf 缓冲区中剩余未发送的字节数;如果是发送 reply 节点的数据,我们使用 clientReplyBlock->used - sentlen就可以得到当前这个节点中剩余未发送的字节数了。

image.png

_writeToClient() 函数的第二个分支就比较简单了,这个分支是只有 buf 缓冲区中有数据的时候,才会走到,其中会调用 write 这个系统调用,把 buf 缓冲区的数据写入到底层的 Socket 连接里面。

最后,我们简单看一下 _writeToClient() 函数这两个关键分支的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int _writeToClient(client *c, ssize_t *nwritten) {
*nwritten = 0;
if (listLength(c->reply) > 0) { // reply队列中有数据,走writev系统调用
int ret = _writevToClient(c, nwritten);
if (ret != C_OK) return ret;
...
} else if (c->bufpos > 0) { // buf缓冲区中有数据,走write系统调用
*nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (*nwritten <= 0) return C_ERR;
c->sentlen += *nwritten;
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
}

return C_OK;
}

说完 writeToClient() 函数核心的实现之后,最后来关注一种异常情况,这种异常情况是:如果分配给某个 IO 线程的某个连接上,要返回 1 MB 的数据,但是只写入了 500 KB 之后,就不可写了,那会发生什么呢?我们要关注两点:

  • 一个是第 30 讲《内核解析篇:Redis 读取与请求核心》中介绍的 IO 线程核心逻辑——IOThreadMain() 函数,它其中只会尝试写入数据,写入失败了就算了,不会做任何其他处理;

  • 二是主线程会在 installClientWriteHandler() 函数中,注册该连接可写事件回调函数 —— sendReplyToClient(),当该连接发生可写事件的时候,sendReplyToClient() 就会调用 writeToClient() 函数继续向客户端写回数据。

这会涉及到 writeToClient() 函数的第二个参数 —— handler_installed。在当前 client 返回的数据全部都发送完之后,会通过这个参数控制,决定是否删除对这个 client 可写事件的监听。writeToClient() 函数中与 handler_installed 相关的代码片段如下:

1
2
3
4
5
6
7
8
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
if (handler_installed) { // 该参数为1的时候,才会清空
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
connSetWriteHandler(c->conn, NULL);
}
...
}

也就是说,只有主线程在 client 还有未返回数据的时候,才会注册可写事件的监听,也只有主线程才会清空该可写事件的监听,这样就形成闭环了。

从如下图的调用栈也可以看出,在 IO 线程中对 writeToClient() 的调用,handler_installed 参数始终为 0,也就不会删除任何 client 可写事件的监听,只负责写回数据。

之所以这么做,是防止多线程同时去操作事件监听而出现并发问题,Redis 让 IO 线程只做 IO 的操作,事件监听的注册和清理全部由主线程处理。

总结

在这一节中,我们重点介绍了 Redis 向客户端写回数据的核心逻辑。

  • 首先,我们介绍了 addReply*() 函数的核心逻辑,介绍了 client 结构体中 buf 以及 reply 字段的含义。

  • 然后,分析了主线程与 IO 线程在写回数据的时候如何进行通信,以及主线程给 IO 线程分配写回任务的核心流程。

  • 最后,深入讲解了 writeToClient() 函数正常向客户端写回数据的逻辑,以及在连接阻塞不可写的异常场景下,Redis 是如何运行的。

下一节,我们将来介绍一下 Redis 中时间事件的相关内容。