在我们常用的关系型数据库中,事务指的是一组 SQL 语句,这一组 SQL 要么全部执行成功,要么全部执行失败,这一组 SQL 语句是一个不可分割的单位。关系型数据库中的事务需要满足原子性一致性隔离性持久性四个特性,也就是常说的 ACID 特性

但是,Redis 是一个 KV 类型的 NoSQL 数据库,并不是一个关系型数据库,而且 Redis 并没有完整支持 ACID 特性,所以关系型事务特性这里不做过多讨论,我们来专注于 Redis 中的事务实现。

Redis 中与事务相关的命令有 MULTI 、 DISCARD 、 EXEC 和 WATCH 四条命令。

  • 首先,我们使用 MULTI 命令用来开启一个事务,然后就可以开始往 Redis 发送命令,这些命令都属于一个事务。注意,这些 Redis 命令在到达 Redis Server 之后,并没有被立即执行,而写入到 client 实例中的一个缓冲队列里面暂存。

  • 在我们把这个事务里面全部的命令都发送到了 Redis Server 之后,就可以再发送一条 EXEC 命令来提交事务了。在 Redis Server 收到 EXEC 命令之后,就会把缓冲队列中的全部命令一起执行掉,整个事务也就提交了。

  • 我们除了发送 EXEC 命令来提交事务之外,还可以发送 DISCARD 命令来放弃当前的事务,也就是放弃缓冲队列中的全部命令。

  • Redis 中还有一个与事务紧密相关的命令 —— WATCH 命令。它的功能是在事务开始之前,也就是调用 MULTI 命令之前,监听一个或多个 Key。当我们在提交事务的时候,如果发现我们监听的任意一个 Key 被其他 Redis 客户端修改了, 那么整个事务将直接被放弃。

事务实现

了解了 Redis 中事务相关的命令,我们一起来看一下 Redis 事务的相关实现。

首先来看 MULTI 命令,Redis 处理 MULTI 命令的逻辑是 multiCommand() 函数,它会在对应 client 的 flags 字段中设置 CLIENT_MULTI 标志位,这表示该 client 进入了事务模式

一旦 client 进入事务模式之后,这个客户端发送的所有命令都会进入缓冲队列,这部分逻辑位于 processCommand() 函数中,相关的代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int processCommand(client *c)

... // 省略查找命令

if (c->flags & CLIENT_MULTI &&

c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&

c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&

c->cmd->proc != resetCommand){

// 当前client处于事务模式,且执行的命令不是EXEC、DISCARD等结束事务的命令,

// 则会将命令写入到缓冲队列中

queueMultiCommand(c);

// 返回QUEUED表示命令已经进入缓冲队列

addReply(c, shared.queued);

} else {

call(c, CMD_CALL_FULL); // 立即执行命令

...

}

return C_OK;

}

queueMultiCommand() 函数写入的缓冲队列其实是 client->mstate 字段,这个字段是 multiState 类型,它里面维护了一个 multiCmd 数组以及数组长度,如下:

1
2
3
4
5
6
7
8
9
typedef struct multiState {

multiCmd *commands; // multiCmd指针,指向一个multiCmd数组

int count; // 记录了multiCmd数组的长度

... // 省略其他字段

} multiState;

在每个 multiCmd 实例中,都维护了一条写入缓冲队列的命令,其中包含了命令的参数、参数个数以及处理命令的 redisCommand 实例,如下所示:

1
2
3
4
5
6
7
8
9
typedef struct multiCmd {

robj **argv; // 命令的参数

int argc; // 命令参数个数

struct redisCommand *cmd; // 命令对应的redisCommand实例

} multiCmd;

在完成命令入队之后,client->argv 和 argc 等字段会立刻被清空,就和执行完命令一样,为下次命令执行做准备。

等到一个事务中的全部命令都提交到缓冲队列之后,我们就可以执行 EXEC 命令了。**EXEC 命令对应的处理函数是 execCommand()** ,其关键实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
void execCommand(client *c) {

int j;

robj **orig_argv;

int orig_argc, orig_argv_len;

struct redisCommand *orig_cmd;



... // 省略非关键代码



// 1、检查一下被 WATCH 命令监听的 Key,要保证这些 Key 未过期,也没有被其他客户端修改过,才能正常提交事务

if (isWatchedKeyExpired(c)) {

c->flags |= (CLIENT_DIRTY_CAS);

}



// CLIENT_DIRTY_CAS用来标识Key被修改改过,

// CLIENT_DIRTY_EXEC用来标识命令入队失败。

// 2、检查事务所有命令入队的过程中,有没有发生过异常,只有全部命令都没有发生异常,才能正常提交事务

if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {

if (c->flags & CLIENT_DIRTY_EXEC) {

addReplyErrorObject(c, shared.execaborterr);

} else {

addReply(c, shared.nullarray[c->resp]);

}

// 3、上面两方面的检查,有任意一项没通过,都会调用 discardTransaction()函数回滚当前事

discardTransaction(c);

return;

}

uint64_t old_flags = c->flags;

c->flags |= CLIENT_DENY_BLOCKING;

unwatchAllKeys(c); // 取消对Key的监听



server.in_exec = 1;



orig_argv = c->argv;

orig_argv_len = c->argv_len;

orig_argc = c->argc;

orig_cmd = c->cmd;

addReplyArrayLen(c,c->mstate.count);

// 4、通过上述所有的检查之后,开始正式执行事务

for (j = 0; j < c->mstate.count; j++) {

... // 具体逻辑在下面的分析中展示

}



... //省略事务执行完成之后的善后逻辑

}
  1. 首先,检查一下被 WATCH 命令监听的 Key,要保证这些 Key 未过期,也没有被其他客户端修改过,才能正常提交事务。

  2. 然后检查事务所有命令入队的过程中,有没有发生过异常,只有全部命令都没有发生异常,才能正常提交事务。

  3. 如果上面两方面的检查,有任意一项没通过,都会调用 discardTransaction() 函数回滚当前事务。在 discardTransaction() 函数中,主要完成了以下三件事情。

    • 释放 client->mstate 队列中缓存的命令,具体实现位于 freeClientMultiState() 函数中,具体实现也很简单,就是迭代 client->mstate 队列,释放其中每个 multiCmd 实例的内存空间。
    • 清理 client->flags 字段中与事务相关的命令。
    • 最后,不再监听事务开始前的 Key,具体实现位于 unwatchAllKeys() 函数中。unwatchAllKeys() 函数的逻辑其实就是 WATCH 命令的逆操作,后面介绍完 WATCH 命令的实现,unwatchAllKeys() 的逻辑小伙伴们自然也就懂了。
  4. 通过上述所有的检查之后,开始正式执行事务。

    • 首先会执行在 client->flags 中设置 CLIENT_DENY_BLOCKING 标记,防止在事务中出现阻塞命令,例如 BRPOP 命令。
    • 然后,调用 unwatchAllKeys() 函数释放对 Key 的监听。
    • 接着,循环 client->mstate 队列中的命令,并逐个调用 call() 函数执行,核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
for (j = 0; j < c->mstate.count; j++) {

c->argc = c->mstate.commands[j].argc; // 更新当前client执行的命令信息

c->argv = c->mstate.commands[j].argv;

c->cmd = c->mstate.commands[j].cmd;



int acl_retval = ACLCheckAllPerm(c,&acl_errpos); // 进行ACL检查

if (acl_retval != ACL_OK) {

... // 如果ACL检查没过,会执行该分支,返回给客户端对应的提示信息

} else {

// 执行命令

if (c->id == CLIENT_ID_AOF)

call(c,CMD_CALL_NONE);

else

call(c,CMD_CALL_FULL);

}

}



// 命令可能修改命令参数,这里会写回到mstate队列中

c->mstate.commands[j].argc = c->argc;

c->mstate.commands[j].argv = c->argv;

c->mstate.commands[j].cmd = c->cmd;

}

我们可以看到,在循环执行命令的过程中,即使命令执行失败了,也不会中断或是回滚,也就是说,主线程会一直执行该事务,直到全部命令执行完毕

这里需要注意一下,在执行 execCommand() 的时候,已经处于一个 call() 函数的调用中了,然后其中又会循环调用 call() 函数,执行事务中的各个命令,也就形成了嵌套调用。此时 redisServer.in_nested_call 字段会从 0 变为 1,该字段在后面介绍 AOF 持久化的时候会用到,这里小伙伴先知道字段的含义即可。

命令执行完成之后,会清理 CLIENT_DENY_BLOCKING 命令,还会调用 discardTransaction() 释放事务相关的状态和缓冲区,这里不再重复。

到此为止,Redis 执行一个事务的核心流程就介绍完了。

WATCH 命令实现

在前面介绍 EXEC 命令的时候我们看到,execCommand() 函数在开始提交事务的第一步,就是检查 WATCH 命令监听的 Key 有没有发生变更,下面我们就来看看 WATCH 命令的相关内容。

首先我们要来了解一下与 WATCH 命令相关的结构体。在 client 中维护了一个 watched_keys 字段,它指向了一个 adlist 列表,其中每个元素都是 watchedKey 实例,通过每个 watchedKey 实例,我们都可以关联到一个 Key,这个 Key 就是当前客户端通过 WATCH 命令监听的一个 Key,整个 watched_keys 列表就是这个客户端当前监听的 Key 的集合。

下面是 watchedKey 结构体的定义,其中的 key 字段就指向了被监听的 Key

1
2
3
4
5
6
7
typedef struct watchedKey {

robj *key; // WATCH命令监听的Key

redisDb *db; // 被监听的Key所在的DB

} watchedKey;

另外,在 redisDb 中也维护了一个 watched_keys 字段,它是指向了一个 dict 实例,其中的 Key 是被监听的 Key,Value 是一个 adlist 集合,其中的元素都是监听了该 Key 的 client 实例,结构如下图所示:

image.png

很明显,通过 client->watched_keys 我们可以查到一个 client 监听的 Key,通过 redisDb->watched_keys 则可以查到一个 Key 被哪些 client 监听了,client 和 Key 之间的正向关联和反向关联都建立起来了。

WATCH 命令对应的处理逻辑位于 watchCommand() 函数,其核心逻辑位于 watchForKey() 函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
void watchForKey(client *c, robj *key) {

list *clients = NULL;

listIter li;

listNode *ln;

watchedKey *wk;

// 创建迭代当前 client 的 watched_keys 列表,查找此次要监听的目标 Key 是否已存在

listRewind(c->watched_keys,&li);

while((ln = listNext(&li))) {

wk = listNodeValue(ln);

if (wk->db == c->db && equalStringObjects(key,wk->key))

return; // 如果存在,证明已经监听了目标 Key,直接返回

}

// 确认监听的目标 Key 不存在时,会从 redisDb 中的 watched_keys 集合中查找目标 Key 对应的 client 列表,然后把当前 client 添加到该列表尾部,这就建立了被监听 Key 与 client 之间的关系

clients = dictFetchValue(c->db->watched_keys,key);

if (!clients) {

clients = listCreate();

dictAdd(c->db->watched_keys,key,clients);

incrRefCount(key);

}

// watchForKey() 函数会创建一个 watchKey 实例,

// 并将其添加到 client->watched_keys 列表尾部,

// 也就是建立了 client 到被监听 Key 之间的关系

wk = zmalloc(sizeof(*wk));

wk->key = key;

wk->client = c;

wk->db = c->db;

wk->expired = keyIsExpired(c->db, key);

incrRefCount(key);

listAddNodeTail(c->watched_keys,wk);

listAddNodeTail(clients,wk);

}

watchForKey() 里面在监听一个 Key 之前,会先创建迭代当前 client 的 watched_keys 列表,查找此次要监听的目标 Key 是否已存在,如果存在,证明已经监听了目标 Key,直接返回。确认监听的目标 Key 不存在时,会从 redisDb 中的 watched_keys 集合中查找目标 Key 对应的 client 列表,然后把当前 client 添加到该列表尾部,这就建立了被监听 Key 与 client 之间的关系。最后,watchForKey() 函数会创建一个 watchKey 实例,并将其添加到 client->watched_keys 列表尾部,也就是建立了 client 到被监听 Key 之间的关系。

在我们的客户端通过 WATCH 命令监听了一个 Key 之后,如果这个 Key 被修改了,我们的 EXEC 命令又是怎么感知到这个 Key 被修改过呢?那是因为 Redis 中执行任意一条修改数据的 Redis 命令,底层都会调用 touchWatchedKey() 函数,从下图的调用栈中就可以看出这一点。

touchWatchedKey() 函数的核心逻辑是查找 redisDb 中的 watched_keys 这个集合,得到正在监听当前修改 Key 的 client 列表,然后在这个列表中所有 client 的 flags 字段里面,添加一个 CLIENT_DIRTY_CAS 标记,表示这个 client 监听的 Key 已经被修改过了。

前面说过,在执行 EXEC 命令时的第一步,就是检查被监听的 Key 是否被修改过,相关的代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {

addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :

shared.nullarray[c->resp]);

// 如果被设置了CLIENT_DIRTY_CAS标记,就表示监听的Key被修改过

discardTransaction(c);

return;

}

这里我们看到,client->flags 字段中还可以设置一个 CLIENT_DIRTY_EXEC 标记,该标记表示的是客户端在一个事务中入队命令的时候出现了异常,此时客户端再发起 EXEC 命令时,也是无法提交事务的,而是直接通过 discardTransaction() 函数释放事务。设置 CLIENT_DIRTY_EXEC 标记的地方位于 flagTransaction() 函数,从下图的调用栈可以看出,flagTransaction() 函数的调用集中在 processCommand() 中

在 processCommand() 调用 call() 函数执行命令之前,会经过一系列检查,如果检查失败就会调用 rejectCommand*() 函数拒绝命令,此时就是调用 flagTransaction() 函数设置 CLIENT_DIRTY_EXEC 函数的时机。例如,前面第 35 讲《内核解析篇:Redis 内存淘汰机制》介绍的内存淘汰失败的场景,processCommand() 中的相关代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if (server.maxmemory && !server.lua_timedout) { // Redis指定了最大内存

// 执行内存淘汰,out_of_memory为1,就是内存淘汰失败了

int out_of_memory = (performEvictions() == EVICT_FAIL);

if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand &&

c->cmd->proc != discardCommand && c->cmd->proc != resetCommand) {

reject_cmd_on_oom = 1; // 该命令在内存淘汰失败的场景中无法执行

}



if (out_of_memory && reject_cmd_on_oom) { // 内存淘汰失败,且命令不能在内存淘汰失败的场景下被调用

rejectCommand(c, shared.oomerr); // 就会设置CLIENT_DIRTY_EXEC标记

return C_OK;

}

}

总结

这一节中,我们重点介绍了 Redis 中事务命令的基本使用,以及核心实现。我们首先简单介绍了使用 Redis 事务基本方式。然后按照这个基本使用方式的顺序,分析了 MULTIEXEC等命令的实现。最后着重介绍了 WATCH 监听 Key 变更的原理以及它影响 Redis 事务执行的方式。

到此为止,Redis 核心命令的实现原理就介绍完了。在下一模块中,我们将开始介绍 Redis 持久化的实现原理。