1. Overview
Redis以下面5个命令,以提供事务的功能:
MULTI
:开始事务DISCARD
:终止事务EXEC
:提交事务WATCH
:监视键,若某个键在事务提交前被修改,则终止该事务UNWATCH
:取消键的监视
2. 事务实现
事务执行主要分为下面几个阶段:
- 开始事务
- 命令入队
- 执行并提交事务/终止事务
2.1. 开始事务
Redis以一个MULTI
命令开启事务,处理该命令的是multiCommand
函数,它把客户端标记为CLIENT_MULTI
,表示其正在执行事务。此外,若该客户端有事务没提交,是不能再次启动另一个事务(即不支持嵌套事务):
void multiCommand(client *c) {
// 不支持嵌套事务
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
// 标记客户端的事务正在执行
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}
2.2. 命令入队
开启事务后,为了保证其原子性,需要把事务中的命令缓存起来,待客户端提交后,将缓冲的命令一次性全部执行。
这里的涉及到事务状态的变量,声明和定义如下代码所述,这里核心就是multiState
:
typedef struct client {
// ...
multiState mstate;
// ...
}
typedef struct multiState {
multiCmd *commands; // 事务中缓冲的命令数组
int count; // 命令个数
int cmd_flags; // 所有命令的标识,即命令标识的累计OR
int minreplicas; // 同步复制的标识
time_t minreplicas_timeout; // 同步复制的超时时间
} multiState;
typedef struct multiCmd {
robj **argv; // 参数
int argc; // 参数个数
struct redisCommand *cmd; // 命令
} multiCmd;
之前所述,服务端通过读回调函数读取客户端发送过来的命令,调用链为readQueryFromClient => processInputBufferAndReplicate => processInputBuffer => processCommand
,这里主要看最后的一个函数。在事务状态下,命令会入队,并返回QUEUED
给客户端。这里截取入队的代码:
int processCommand(client *c) {
// ...
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) {
// 事务状态下,若不是EXEC, DISCARD, MULTI, WATCH命令,则入队
queueMultiCommand(c);
// 返回QUEUED给客户端
addReply(c,shared.queued);
} else {
// 其它情况,直接执行命令,包括普通命令和事务状态下EXEC, DISCARD, MULTI, WATCH命令
call(c,CMD_CALL_FULL);
// ...
}
return C_OK;
}
2.3. 事务提交
EXEC
命令可用于提交事务。这里直接看该命令的处理函数execCommand
:
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
int was_master = server.masterhost == NULL;
// 若不在事务开启状态,直接返回错误
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
// 检查:
// 1. 是否有被WATCH的键被修改
// 2. 是否在入队过程中(即一个命令入队前)出错
// 若出现二者之一,直接终止
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
discardTransaction(c);
goto handle_monitor;
}
// 若事务命令中包含写命令,但是自己有一个只读从节点,则也终止事务
if (!server.loading && server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
{
addReplyError(c,
"Transaction contains write commands but instance "
"is now a read-only slave. EXEC aborted.");
discardTransaction(c);
goto handle_monitor;
}
// 开始执行队列中所有的命令
// a) 取消WATCH的键,因为单线程模型,且通过WATCH检查,因此取消它们WATCH是安全的
unwatchAllKeys(c);
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
// b) 执行队列中的每一条命令
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
// 若执行到第一条写命令时,首先给所有的从节点和AOF传播MULTI命令
// 因为之前执行MULTI命令后,dirty不变大,因此该命令之前没有传播给从节点和AOF
// 所以传播写命令之前,需要传播MULTI给从节点和AOF
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
execCommandPropagateMulti(c);
must_propagate = 1;
}
// 执行命令,这里会把写命令传播给从节点和AOF
call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
// 取消事务状态,表示本节点的事务已经完成
discardTransaction(c);
// 事务执行完后,增加server.dirty,之后EXEC会由于该数值增加,传播到从节点和AOF上
if (must_propagate) {
int is_master = server.masterhost == NULL;
server.dirty++;
// ...
}
handle_monitor:
// ... 把事务状态发给监视器 ...
}
可见,当事务要执行,则需要:
- 被
WATCH
的键不能被修改(否则会有CLIENT_DIRTY_CAS
标识) - 入队过程中不能出错(否则会有
CLIENT_DIRTY_EXEC
标识) - 没有只读的从节点
关于主从复制,回顾一下,一般情况下:
若
server.dirty
增大,则命令会被传播到从节点和AOF中(具体调用函数链processCommand => call => propagate
;void call(client *c, int flags) { // ... c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; // ... // flags是CMD_CALL_FULL,所以这里会进去 if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE; // 若修改了,则标记传播AOF和从节点, if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); // ... if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) // 由于修改了,所以有PROPAGATE_XX标记,所以命令会被传播 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } // ... }
MULTI
不会导致server.dirty
变大,因此执行完该命令时,不会传播到从节点和AOF中。所以在执行EXEC
时,遇到第一个写命令,需要先传播MULTI
;
call
函数在执行完命令后,根据server.dirty
是否增加,而决定是否传播命令给从节点和AOF:
MULTI
命令不会更改server.dirty
,所以不会传播(留在EXEC
执行时传播)- 在事务上下文下,命令入队是不会更改
server.dirty
,所以不会传播(留在EXEC
执行时传播)- 写命令会更改
server.dirty
,所以会传播EXEC
会更改server.dirty
,所以也会传播从
execCommand
可以看出,在这个函数里,会统一传播MULTI
和之后的写命令;而该函数返回后,会传播EXEC
。
另外,在Redis Cluster模式下,也支持所谓事务,但是一旦操作的数据不在该节点上,只会返回MOVED
错误,事务不会回滚,只会继续执行下面的命令。因此,Redis并不支持集群级别/跨节点的事务。
2.4. 事务终止
事务终止使用的是DISCARD
命令,这里的代码就很简单了,只是清除了事务的状态和资源。
由于在MULTI
和DISCARD
之前,没有对server.dirty
做任何修改,所以从节点和AOF不会收到任何关于该事务的数据,也即没有任何的更改。
void discardCommand(client *c) {
// 不在事务上下文中,则返回错误
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"DISCARD without MULTI");
return;
}
// 清除事务状态
discardTransaction(c);
addReply(c,shared.ok);
}
3. 事务的ACID
3.1. 原子性
Redis事务的特点是:
- 事务中的命令延迟到
EXEC
执行 - 命令要么全不执行,要么全部执行
- 若中间命令执行错误,下面的命令也会执行,不会回滚
因此,可以认为Redis事务是能一次性完成,但若事务执行时出错,不支持回滚,因此我认为它没有传统意义的原子性。
3.2. 一致性
这里指事务执行前后,数据符合定义和要求(满足一定的约束),没有非法和无效的数据。
Redis通过错误检测的设计,保证事务的一致性:
- 入队错误:事务会被拒绝执行,所以是”一致的”
- 命令执行错误:错误的命令不会被执行(修改数据),所以是”一致的“,前提是需要客户端检测出错的命令
- 宕机问题:
- 若没配置持久化,则恢复出来是空数据库,因此是”一致的“
- 若配置RDB,则恢复出来的是上一个一致的状态,虽然数据是旧的,但也是”一致的“;
- 若配置AOF,若事务中的命令没有落入AOF,则恢复出来的是”一致的“;但事务中的命令落入了AOF,则需要利用工具回滚
总之,Redis是满足一定的事务一致性约束的,但是具体还得和具体场景结合起来分析其”一致性“。个人认为它并不具备一些关系数据库级别的一致性,因为它不支持回滚等特性。
如A向B转账,B向C转账的事务,需要满足:事务前后钱总量相同。
但第1步失败了,第2步还是会执行,因此上述的约束不满足,也达不到一致性。
另外,关于主从节点数据的一致性,事务的传播和复制是异步的,即事务提交并返回后,从节点可能并没有提交事务,因此其一致性也是很弱的(最终一致)。
3.3. 隔离性
Redis是单线程模型,因此多事务执行完全是串行执行的,即达到”串行化“隔离级别(Serializable)。
3.4. 持久性
这部分取决于Redis持久化的配置:
- 当
appendfsync
为always
,能保证持久性; - 其它情况下不能保证持久性。
此外,事务的最后加上一条SAVE
命令,也能保证持久性,但是效率很低,不推荐使用。
3.5. 事务特性总结
根据上面的分析,Redis事务对ACID的支持是非常弱的(除了串行化隔离),所以个人认为Redis事务的实现用武之处不大,若需要ACID的支持,更推荐数据库产品(如支持事务的关系型数据库),而非Redis缓存。
4. WATCH
:乐观锁
WATCH
命令可以监视一个(或多个) 键 。
若在事务执行之前(即EXEC
执行命令前),这些键被其他命令所改动,那么事务将被终止,拒绝执行。
被监视的键会保存在客户端的一条链表(存watchedKey
)里,如下所示:
typedef struct client {
// ...
list *watched_keys; // 监控的键的链表
// ...
}
typedef struct watchedKey {
robj *key; // 键
redisDb *db; // 对应的数据库
} client;
此外,每个数据库会保存被监视的键,以字典保存,键是被监视的键,值是监视的客户端列表:
typedef struct redisDb {
// ...
dict *watched_keys; // key -> list of client
// ...
} redisDb;
4.1. 添加WATCH
这部分在watchCommand
里,它只能在事务上下文中执行。实际上它调用的是watchForKey
函数来给一个键添加WATCH
。该函数主要更新客户端的watched_keys
链表和数据库的watched_keys
字典:
void watchForKey(client *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
listRewind(c->watched_keys,&li);
// 遍历客户端已监控的键,若出现重复,则直接返回
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return; /* Key already watched */
}
// 将客户端自己添加到对应数据库的watched_keys字典中
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
listAddNodeTail(clients,c);
// 将待监控的键添加到客户端的watched_keys链表中
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
}
4.2. 监控键的修改
当某个键被修改时,会调用函数signalModifiedKey
,实际上内部调用的是函数touchWatchedKey
,它会给监视对应键的客户端添加CLIENT_DIRTY_cAS
标识,当客户端要执行事务时,如2.3.所示,事务会被拒绝执行:
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
// 给所有监视该键的客户端添加CLIENT_DIRTY_CAS标识
// 当执行事务命令前,若有该标识,事务会拒绝执行
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags |= CLIENT_DIRTY_CAS;
}
}
4.3. 乐观锁
乐观锁:假设各事务之间不会产生竞争,冲突的操作可以执行,当等待事务提交前,检查冲突是否存在;
悲观锁:假设各各事务之间会产生冲突,事务操作数据时,预先会为其加锁,防止其它事务修改,只有当锁释放,其它事务才能执行冲突的操作。
Redis的WATCH
是基于乐观锁的实现,它不通过检查版本号检测冲突,而是通过检查监视的键是否被修改来检测冲突。若冲突,Redis会拒绝事务的执行。
其它数据库,如MySQL,会提供基于版本号的乐观控制,如MVCC,若出现了冲突/违反数据完整性,事务会被回滚。