在前面两节中,我们详细分析了全量同步和部分同步过程中,主库完成了哪些关键操作,以及 Redis 在不同版本中的各项优化。在这一节中,我们继续在主库视角下,分析一下客户端命令是如何从主库传播到从库的。

命令传播

无论经过部分同步还是全量同步之后,主从的数据基本上是一致了,但是从库这个时候还是略微落后于主库,从库可以通过同步 backlog 里面的数据,进一步追平主库。这部分实现与主库正常执行一条命令并传播给从库的逻辑基本一致,所以我们将这两部分内容合并到这一节一起介绍。

写入共享缓冲区

在前文介绍 AOF 持久化的时候提到,call() 函数不仅会执行客户端发来的命令,还会调用 alsoPropagate() 函数将命令写入 redisOpArray 队列中暂存,然后在 propagateNow() 中去读取 redisOpArray 队列,并写入 AOF 缓冲区,等待后续写入到 AOF 文件中。

如上图所示,propagateNow() 函数中还有另一个分支就是 replicationFeedSlaves() 函数,它是命令发送到从库的入口。

下面我们来看 replicationFeedSlaves() 函数的核心逻辑,它首先会进行一系列前置检查。

  1. 检查当前的 Redis 实例是否为主库,只有主库才能通过 replicationFeedSlaves() 函数向从库同步命令。如果是从库向 Sub-Slave 发送命令,会通过前面介绍的 replicationFeedSlavesFromMasterStream() 函数实现同步的。

  2. 检查当前主库是否开启了 backlog 缓冲区,还会检测当前主库下面是否有从库存在,如果这两个条件都不满足,也就无需执行后续传输了,立刻返回。

  3. 检查是否需要切换 redisDb,如果需要,这里会向 repl_buffer_blocks 这个全局共享缓冲区追加一条 SELECT 语句。

完成上述检查之后,replicationFeedSlaves() 函数会将命令按照 RESP3 中 Bulk String 的格式转成字节数组,然后写入到 redisServer.repl_buffer_blocks 这个共享缓冲区中,具体写入逻辑位于 feedReplicationBuffer() 函数中。

feedReplicationBuffer() 函数首先会尝试在共享缓冲区的最后一个 replBufBlock 块中进行追加,如果最后一个 replBufBlock 块写满了,就写入到一个新的 replBufBlock 块中,并把这个新 replBufBlock 块追加到共享缓冲区的队尾。

写入完成之后,feedReplicationBuffer() 函数会遍历一次 redisServer.slaves 列表,为从库初始化命令同步的起始位置,也就是从库 client 中 ref_repl_buf_node 和 ref_block_pos 字段。初始化这两个字段背后的逻辑有点绕,我们通过两个例子来说明一下这个细节。

来看第一个例子,假设我们现在有一个从库 A 进行了一次全量同步,在从库 A 的 client 变为 WAIT_BGSAVE_END 状态的时候,无论是无磁盘同步还是基于磁盘的同步,这个状态表示子进程已经 fork 出来了,这个时候,如果主库执行了一条修改命令,就会走到 feedReplicationBuffer() 函数中,将从库 A 的 ref_repl_buf_node 以及 ref_block_pos 字段,指向刚刚写入到这个共享缓冲区的这条命令。

image.png

第二个例子是在上面这个例子基础上展开的,主要是对应上一讲 “RDB 生成的场景分类”小节中,分支 4 复用同一个 RDB 文件的处理,也就是说从库 A 正在进行磁盘同步,此时从库 B 也触发了磁盘同步,它就会复用为 A 生成的 RDB 文件,那么在全量同步执行完之后,从库 B 需要和从库 A 的同步起始位置一致,如下图所示:

image.png

为了实现这个效果,syncCommand() 函数处理分支 4 的逻辑中,会调用 copyReplicaOutputBuffer() 函数,将从库 A 的 ref_repl_buf_node 和 ref_block_pos 值拷贝给从库 B,这样两者的同步起始位置就一致了。

说完从库同步起始的设置之后,feedReplicationBuffer() 还会对 backlog 缓冲区进行一些修改。比如,如果 backlog 缓冲区还没有初始化,就用其 ref_repl_buf_node 字段记录指向共享缓冲区的第一个节点,完成初始化。如果此次写入导致新增 replBufBlock 块,就需要检查一下是不是要为其建立稀疏索引,相应的实现位于 createReplicationBacklogIndex() 函数中。

另外,还需要检查共享缓冲区是否已经超过了 repl-backlog-size 配置指定上限值(默认 1MB),如果超过了,就会从共享缓冲区的头部开始销毁历史节点。当然,如果有其他从库 client 正在使用这些节点,它们自然是不能被销毁的。另外,在销毁节点的时候,除了删除 repl_buffer_blocks 列表中的节点,还需要将稀疏索引中的相应节点也删掉。这个截断共享缓冲区历史节点的逻辑位于 incrementalTrimReplicationBacklog() 函数中,感兴趣的小伙伴可以查看源码进行学习。

发送数据

在为从库确定了同步的起始位置之后,IO 线程后续就会从这个位置开始,向从库发送命令。通过前面对 Redis IO 线程的介绍我们知道,IO 线程向客户端写回数据的时候,调用的是 writeToClient() 函数,其中最核心的逻辑如下。小伙伴们可以回顾第 32 讲《内核解析篇:命令响应的核心原理解析》查看 writeToClient() 函数更详细的解析。

1
2
3
4
5
6
7
8
int writeToClient(client *c, int handler_installed) {
... // 省略其他逻辑
while(clientHasPendingReplies(c)) { // 当前client是否有要返回的数据
int ret = _writeToClient(c, &nwritten); // 向底层Socket连接写入数据
... // 省略其他逻辑
}
... // 省略其他逻辑
}

这里使用的 clientHasPendingReplies() 和 _writeToClient() 函数内部,会针对从库 client 做特殊处理。clientHasPendingReplies() 函数中会根据从库 client 的同步位置,来判断是否还有数据要发给这个从库,相关代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int clientHasPendingReplies(client *c) {
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
if (c->ref_repl_buf_node == NULL) return 0; // 是否初始化
listNode *ln = listLast(server.repl_buffer_blocks); // 获取共享缓冲区最后一个节点
replBufBlock *tail = listNodeValue(ln);
// 如果ref_repl_buf_node指向了最后一个节点,ref_block_pos指向了最后一个节点的最后一个字节,
// 那就表示这个从库已经同步了全部的数据,没有其他数据要发送了
if (ln == c->ref_repl_buf_node &&
c->ref_block_pos == tail->used) return 0;
return 1;
} else { // 针对普通client的处理,检查的bufpos和reply列表
return c->bufpos || listLength(c->reply);
}
}

_writeToClient() 函数针对从库的处理也是类似的,先通过 CLIENT_TYPE_SLAVE 标志位识别从库 client,然后从 ref_repl_buf_node 和 ref_block_pos 确定的同步位置开始发送数据,当前 replBufBlock 块中的数据发送完了,ref_repl_buf_node 会继续执行指向共享缓冲区的下一个数据块,继续发送。每次发送完一个数据块的时候,_writeToClient() 还会调用 incrementalTrimReplicationBacklog() 函数,尝试去释放共享缓冲区中的历史数据块。

下面展示了 _writeToClient() 函数处理从库 client 的关键逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int _writeToClient(client *c, ssize_t *nwritten) {
*nwritten = 0;
if (getClientType(c) == CLIENT_TYPE_SLAVE) { // 针对
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
if (o->used > c->ref_block_pos) { // 发送ref_repl_buf_node指向的数据块
*nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
o->used-c->ref_block_pos);
if (*nwritten <= 0) return C_ERR;
c->ref_block_pos += *nwritten;
}
// 发送完当前的replBufBlock数据块之后,ref_repl_buf_node会指向下一个数据块
listNode *next = listNextNode(c->ref_repl_buf_node);
if (next && c->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->ref_repl_buf_node = next;
c->ref_block_pos = 0;
// 尝试释放历史数据块
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
return C_OK;
}
... // 省略对普通client的处理
}

命令执行的限制

在前面介绍从库功能的时候,分析了从库在命令执行方面的一些限制。类似的,主库在执行命令的时候,也会有一些额外的限制。

首先要明确一件事情,主从复制架构的核心目的之一就是实现高可用。当主库宕机的时候,从库可以切换为主库继续对外提供服务,但如果全部从库都已经宕机,或是从库与主库延迟很大的时候,主库发生宕机,我们将从库切换为主库,就会导致数据丢失。

为了尽可能地避免这种情况,Redis 提供了 min-replicas-to-writemin-replicas-max-lag 两个配置。前者指定至少有多少个正常的从库,才能继续执行修改命令;后者设置了从库与主库延迟超过多少秒,就会因延迟过大而被判定为异常。

应用上述两个配置的地方位于 checkGoodReplicasStatus() 函数中,它由 processCommand() 函数调用,相关代码片段如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int processCommand(client *c) {
... // 省略其他逻辑
if (is_write_command && !checkGoodReplicasStatus()) {
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}
... // 省略其他逻辑
}

int checkGoodReplicasStatus(void) {
return server.masterhost || // 当前Redis实例是否主库
!server.repl_min_slaves_max_lag ||// 对应min-replicas-max-lag配置
!server.repl_min_slaves_to_write ||// 对应min-replicas-to-write配置
// 检查符合min-replicas-max-lag配置的在线从库个数是否达标
server.repl_good_slaves_count >= server.repl_min_slaves_to_write;
}

这里的 redisServer.repl_good_slaves_count 字段,记录了当前有多少个正常的从库,相应的更新逻辑位于 refreshGoodSlavesCount() 函数中,它会根据从库的最近响应时间来计算该从库延迟了多少秒,这样,就可以结合 min-replicas-max-lag 配置,确定有多少个正常的从库。

refreshGoodSlavesCount() 函数在从库上线、下线的时候都有个调用,在下面即将介绍的主库定时任务中也会有调用,相应的调用栈下图所示:

主从复制中的定时任务

在上一讲的分析中,我们简单提到过 replicationCron(),它用来执行一些主从复制相关的定时逻辑,这里依旧分为主从两侧进行分析。

主库中的定时任务

我们先来看 replicationCron() 函数在主库侧的核心逻辑:

image.png

通过上图我们看到,主库会周期性地做很多事情,这里针对每件事都进行一下说明。

  1. 主库会在 replicationCron() 函数中周期性(默认 10 s)地向所有从库发送 PING 命令作为心跳消息,防止从库认为主从之间发生故障,主动断开主从连接。

  2. 如果有从库正在进行基于磁盘全量同步,这些从库会阻塞等待 RDB 数据的传输,无法及时处理 PING 命令。这个时候,主库会每秒向这些从库发送换行符,防止出现超时的情况。

  3. 除了主库定时发送心跳,在线状态的从库也会定时向主库发送 REPLCONF ACK { Replication Offset}请求作为心跳请求,如果长时间未收到心跳,主库就会认为其出现故障,断开与其连接。

  4. 在进行无磁盘同步的时候,主库会不断向从库发送 RDB 数据,从库会进行阻塞读取,如果主从连接长时间不可写,主库也会认为从库出现故障,断开与其连接。

  5. 在从库个数降为 0 时,主库会使用 repl_no_slaves_since 字段记录当前时间戳,之后主库会定时检查从库的个数,当主库发现长时间无从库连接时,为了节省空间,就会释放共享缓冲区中的全部数据,并新生成 Replication ID,之后连接上来的从库需要进行全量同步。

  6. 主库还会定时检查子进程的状态以及从库状态,如果有从库处于 WAIT_BGSAVE_START 状态等待 RDB 生成,且没有子进程在运行,这里就可以启动 RDB 子进程。这部分实现位于 shouldStartChildReplication() 函数中,其中最核心的代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
if (slaves_waiting && // 是否有从库等待RDB进行全量同步
(!server.repl_diskless_sync || // 使用磁盘同步
(server.repl_diskless_sync_max_replicas > 0 && // 配置了最大等待的从库数量
slaves_waiting >= server.repl_diskless_sync_max_replicas) || // 当前等待的从库已经达到上限值
max_idle >= server.repl_diskless_sync_delay)) // 等待时间足够长
{
if (mincapa_out)
*mincapa_out = mincapa;
if (req_out)
*req_out = req;
return 1; // 返回1就会启动子进程生成RDB
}
  1. 主库接下来会检查根据自身的持久化策略以及从库全量同步的状态,如果主库没有开启任务持久化策略,并且从库都已经全部完成了全量同步,则主库会删除全量同步时产生的 RDB 文件。这部分逻辑位于 removeRDBUsedToSyncReplicas() 函数中。
  2. 最后,主库执行 refreshGoodSlavesCount() 函数,统计在线且延迟较低的从库,并记录到前文提到的 repl_good_slaves_count 字段,从而确定后续命令是否能正常写入。

从库的定时任务

说完主库侧的定时任务之后,我们再来看看 replicationCron() 函数中从库侧的定时任务。从库在 replicationCron()函数中主要是在不同 repl_state 状态下,检查主从复制情况,相关的逻辑如下所示:

image.png

从上面这张流程图我们可以看出,从库在建连和握手的过程中,会定时检查建连和握手是否超时。在进行全量同步时,会定时检查是否长时间无数据传输,如果出现超时,就会重新建立主从连接。之后,从库会检查从库最后一次读取到主库命令的时间戳,正常情况下,主库会定时发送 PING 命令作为心跳,如果从库长时间未读取到主库发来的命令,就会认为主库宕机了,进而断开主从连接。

最后,从库会周期性向主库发送 REPLCONF ACK {Replication Offset} 命令作为心跳,主库会在对应 client 实例的 repl_ack_off 和 repl_ack_time 字段中记录从库的 Replication Offset 和 ACK 时间,前面主库就是通过 ACK 时间判断从库是否故障的,在后续介绍哨兵模式中的主库切换流程时,我们会看到,哨兵会使用 repl_ack_off 字段来判断各个从库的复制进度,从而决定哪个从库应该被提升为主库。

总结

在这一节中,我们站在主库的角度,详细分析了 Redis 主库执行的客户端命令,发送到从库的关键流程;然后,我们介绍了主从复制场景中,主库处理客户端命令时的一些限制;最后进行了简单的拾遗,分析了 Redis 与主从复制的定时任务。

到这里,有关 Redis 主从复制的核心原理和关键实现,我们就都介绍完了。在下一模块中,我们将开启对 Redis Sentinel 这一集群方案的分析。