无题
在上一节中,我们详细介绍了 Redis 在 IO 多线程模式下,命令解析和命令执行的核心逻辑。小伙伴们可能会产生这样一个疑问:我们调用的 redisCommand->proc() 函数的时候,是没有返回值的,那命令执行产生的返回值是怎么返回给客户端的呢?
下面我们就来详细分析下这个问题。
数据返回
通过前面的介绍我们知道,Redis 在 IO 多线程模型下,命令产生的返回值是通过 IO 线程写回给客户端的,那既然 redisCommand->proc() 函数没有返回值,我们就会猜测 proc() 函数里面会把返回值写入到某个指定的地方,然后 IO 线程会去这个地方取该结果值,然后返回给客户端。
这里我们以 GET 命令为例进行分析,GET 命令对应的 proc 处理函数是 getGenericCommand() 函数,其核心逻辑如下:
1 | int getGenericCommand(client *c) { |
从名字就可以看出,addReplyBulk() 函数是向某个地方添加返回值,返回值的格式还是 Bulk String 的格式。按照这个思路,我们可以在 networking.c 文件中看到很多 addReply*() 函数,也都是写入返回值的入口。
这些 addReply*() 函数基本都是写入比较简单的、基础的数据,通过这些简单数据的组合,我们就可以写入一个复杂类型。
下面我们依旧以 GET 命令使用的 addReplyBulk() 函数为例,其底层就是一个 addReplyBulkLen() 函数和两个 addReply() 函数,调用栈如下图所示:
addReplyBulkLen() 函数的实现如下:
1 | void addReplyBulkLen(client *c, robj *obj) { |
在 addReplyBulkLen() 函数中,会编码 Bulk String 结构的头部:首先计算字符串的长度,然后用 addReplyLongLongWithPrefix() 函数,在这个长度值前面添加 “$” 前缀,后面添加 “\r\n” 后缀,这就拼接好了 Bluk String 的第一行,最后就是调用 addReplyProto() 函数,把拼好的 Bulk String的第一行写入到 client->buf 缓存中,等待 IO 线程进行发送。
addReply() 函数也是类似的逻辑,它会将查询到的 robj 转换成字符串,然后追加到 client->buf 缓冲区中。
前面一直说 Redis 主线程会把命令的返回值写入到一个指定的位置,这个指定位置就是这里所说的 client 的这两个字段。
一个是
buf 字段。它作为缓冲区可以暂存返回给对应客户端的数据,使用 bufpos 字段记录 buf 缓冲区中最后一个有效字节的位置。另一个是
reply 字段。它是一个 adlist 链表。buf 缓冲区的长度是固定的 16 K,当返回结果超过 16K 时,Redis 开始向 reply 列表写入。reply 列表中每个节点都是一个 clientReplyBlock 实例(其中封装了 16 K 的缓冲区),只有在填满一个 clientReplyBlock 缓冲区之后,才会向 reply 列表中追加新的 clientReplyBlock 实例。
写入 buf 缓冲区以及 reply 列表的入口是 _addReplyToBufferOrList() 函数,它的调用栈如下:
它里面调用的 _addReplyToBuffer() 函数是向 buf 缓冲区里面写数据,_addReplyProtoToList() 函数则是往 reply 链表中写入数据。_addReplyToBufferOrList() 函数的核心片段如下:
1 | void _addReplyToBufferOrList(client *c, const char *s, size_t len) { |
_addReplyToBuffer 和 _addReplyProtoToList 两个函数的细节就不再展示了,感兴趣的小伙伴可以参考源码进行分析。
分配写回任务
弄清楚命令返回值写入到哪里以及如何写入之后,我们还有一点需要弄清楚:Redis 主线程是如何与 IO 线程交互,通知 IO 线程这个 client 有数据需要返回的呢?
这就涉及到 addReply*() 方法底层调用到的另一个函数 —— prepareClientToWrite(),调用 prepareClientToWrite() 函数的方法如下图所示:
prepareClientToWrite() 里面先是检查一下 client 里面标志位,确保这个 client 表示客户端能够接收返回值,比如,这个客户端马上就要关闭了,就无法向其返回数据了。
接下来就是 prepareClientToWrite() 函数的核心,这里通过 clientHasPendingReplies() 这个 if 判断,这是第一次往 buf 缓冲区里面写数据,此时才会走 putClientInPendingWriteQueue() 函数,把 client 添加到 server.clients_pending_write 队列中,后续就会在这些 client 的连接上监听可写事件。
1 | int prepareClientToWrite(client *c) { |
另外,putClientInPendingWriteQueue() 函数还会在 client->flags 中设置 CLIENT_PENDING_WRITE 这个标志位,防止一个 client 被多次添加到 server.clients_pending_write 队列中。
弄明白 client->buf 缓冲区的填充逻辑以及主线程与 IO 线程的交互逻辑之后,我们继续主线程的执行逻辑。当主线程执行到 beforeSleep 函数时,里面会调用 handleClientsWithPendingWritesUsingThreads() 函数,将 server.clients_pending_write 队列中的 client,分配需要写回的 client 给 IO 线程进行处理。
我们来看一下 handleClientsWithPendingWritesUsingThreads() 函数的核心逻辑:
1 | handleClientsWithPendingWritesUsingThreads(void) { |
首先是检查 server.clients_pending_write 队列长度,要是这个队列是空,就没必要执行后续写回响应的流程了。
然后是检查 server.io_threads_num 以及 server.io_threads_active 字段,如果开启了 IO 多线程的模式,但是 IO 线程还未激活,就在此处释放对应的锁,激活 IO 线程,这个前面提到过,不再多说。如果是没有开启 IO 多线程的模式,就会走 handleClientsWithPendingWrites() 这个分支,这里面会由主线程自己完成全部的写回响应的任务。
下面正式进入写回任务的分配。首先就是一个 while 循环 ,使用 Round-Robin 算法将 clients_pending_write 队列中待处理的 client,分配给每个 IO 线程,也就是分配到 io_threads_list 队列中。
接下来,设置 io_threads_op 全局标识为 IO_THREADS_OP_WRITE,用来告诉 IO 线程此次处理的是可写事件。还会设置每个 IO 线程要处理的 client 数量,也就是设置 io_threads_pending 数组对应的元素值,这就会让 IO 线程退出自旋等待,开始执行 writeToClient() 函数将 client->buf 以及 reply 队列中的响应发送给客户端。
IO 线程并发处理自己负责的 io_threads_list 队列的时候,主线程也不会闲着,它会处理 io_threads_list[0] 这个队列中的 client,这里也就是通过 writeToClient() 函数发送数据。在主线程完成 io_threads_list[0] 列表的处理之后,会阻塞等待全部 IO 线程完成自己负责的写入任务。
等到主线程以及全部 IO 线程都处理完了自己的任务,主线程的阻塞结束。主线程会先将 io_threads_op 全局变量设置成 IO_THREADS_OP_IDLE,表示 IO 线程空闲了。
最后,再重新逐个检查一下 server.clients_pending_write 队列中的 client,是不是还有数据要返回客户端。如果有的话,会调用 CT_Socket.set_write_handler 函数将 sendReplyToClient() 函数设置为 connection-> write_handler 回调函数。
这样的话,当相应连接之后变的可写的时候,主线程会直接调用 sendReplyToClient() 函数,它里面会调用 writeToClient() 函数,把 client->buf 以及 reply 队列中的剩余数据返回给客户端。
writeToClient() 函数
在完成写回任务的分配之后,无论是主线程还是各个 IO 线程,都会调用 writeToClient() 函数处理每个 client 中要返回给客户端的数据。下面我们就看看 writeToClient() 函数实现的细节。
writeToClient() 函数的核心是一个 while 循环,其中会不断调用 _writeToClient() 函数,往底层的 Scoket 连接里面写数据。小伙伴们可以先大致看一眼 writeToClient() 函数的核心代码框架:
1 | int writeToClient(client *c, int handler_installed) { |
我们先来展开看看 _writeToClient() 函数的实现,它里面根据 reply 队列的情况分成了两个分支。
第一个分支是 client->reply 队列里面有数据的时候才会走到,其中使用 writev 系统调用,把多个 buf 缓冲区以及多个 reply 节点的数据,通过一次 writev 系统调用写入到底层的 Socket 连接里面。在一次 writev 调用完成之后,_writevToClient() 函数把已经发送出的数据清理掉。例如,下面这张图的状态,图中发送之前,buf 缓冲区已经被填满了,reply 队列里面还有三个节点,writev 调用一次就把 buf 缓冲区以及 reply 队列中前两节点的数据都发送出去了,此时就会清空 buf 缓冲区,同时删除 reply 队列中的前两个节点。
如果小伙伴们去看 _writevToClient() 函数具体实现,里面会涉及到 client->sentlen 字段,这里说一下它的作用,它是用来记录当前缓冲区已经发送了多少个字节。比如说,下面这张图展示的两种情况,在发送 buf 缓冲区数据的时候,我们用 bufpos - sentlen 就可以得到 buf 缓冲区中剩余未发送的字节数;如果是发送 reply 节点的数据,我们使用 clientReplyBlock->used - sentlen就可以得到当前这个节点中剩余未发送的字节数了。
_writeToClient() 函数的第二个分支就比较简单了,这个分支是只有 buf 缓冲区中有数据的时候,才会走到,其中会调用 write 这个系统调用,把 buf 缓冲区的数据写入到底层的 Socket 连接里面。
最后,我们简单看一下 _writeToClient() 函数这两个关键分支的实现:
1 | int _writeToClient(client *c, ssize_t *nwritten) { |
说完 writeToClient() 函数核心的实现之后,最后来关注一种异常情况,这种异常情况是:如果分配给某个 IO 线程的某个连接上,要返回 1 MB 的数据,但是只写入了 500 KB 之后,就不可写了,那会发生什么呢?我们要关注两点:
一个是第 30 讲《内核解析篇:Redis 读取与请求核心》中介绍的 IO 线程核心逻辑——IOThreadMain() 函数,它其中只会尝试写入数据,写入失败了就算了,不会做任何其他处理;
二是主线程会在 installClientWriteHandler() 函数中,注册该连接可写事件回调函数 —— sendReplyToClient(),当该连接发生可写事件的时候,sendReplyToClient() 就会调用 writeToClient() 函数继续向客户端写回数据。
这会涉及到 writeToClient() 函数的第二个参数 —— handler_installed。在当前 client 返回的数据全部都发送完之后,会通过这个参数控制,决定是否删除对这个 client 可写事件的监听。writeToClient() 函数中与 handler_installed 相关的代码片段如下:
1 | if (!clientHasPendingReplies(c)) { |
也就是说,只有主线程在 client 还有未返回数据的时候,才会注册可写事件的监听,也只有主线程才会清空该可写事件的监听,这样就形成闭环了。
从如下图的调用栈也可以看出,在 IO 线程中对 writeToClient() 的调用,handler_installed 参数始终为 0,也就不会删除任何 client 可写事件的监听,只负责写回数据。
之所以这么做,是防止多线程同时去操作事件监听而出现并发问题,Redis 让 IO 线程只做 IO 的操作,事件监听的注册和清理全部由主线程处理。
总结
在这一节中,我们重点介绍了 Redis 向客户端写回数据的核心逻辑。
首先,我们介绍了 addReply*() 函数的核心逻辑,介绍了 client 结构体中 buf 以及
reply字段的含义。然后,分析了主线程与 IO 线程在写回数据的时候如何进行通信,以及主线程给 IO 线程分配写回任务的核心流程。
最后,深入讲解了 writeToClient() 函数正常向客户端写回数据的逻辑,以及在连接阻塞不可写的异常场景下,Redis 是如何运行的。
下一节,我们将来介绍一下 Redis 中时间事件的相关内容。
