源码阅读-Redis独立功能: 事务

Posted by keys961 on December 9, 2019

1. Overview

Redis以下面5个命令,以提供事务的功能:

  • MULTI:开始事务
  • DISCARD:终止事务
  • EXEC:提交事务
  • WATCH:监视键,若某个键在事务提交前被修改,则终止该事务
  • UNWATCH:取消键的监视

2. 事务实现

事务执行主要分为下面几个阶段:

  • 开始事务
  • 命令入队
  • 执行并提交事务/终止事务

2.1. 开始事务

Redis以一个MULTI命令开启事务,处理该命令的是multiCommand函数,它把客户端标记为CLIENT_MULTI,表示其正在执行事务。此外,若该客户端有事务没提交,是不能再次启动另一个事务(即不支持嵌套事务):

void multiCommand(client *c) {
    // 不支持嵌套事务
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    // 标记客户端的事务正在执行
    c->flags |= CLIENT_MULTI;
    addReply(c,shared.ok);
}

2.2. 命令入队

开启事务后,为了保证其原子性,需要把事务中的命令缓存起来,待客户端提交后,将缓冲的命令一次性全部执行。

这里的涉及到事务状态的变量,声明和定义如下代码所述,这里核心就是multiState

typedef struct client {
    // ...
    multiState mstate;
    // ...
}

typedef struct multiState {
    multiCmd *commands;  // 事务中缓冲的命令数组
    int count; // 命令个数 
    int cmd_flags; // 所有命令的标识,即命令标识的累计OR
    int minreplicas;  // 同步复制的标识
    time_t minreplicas_timeout;  // 同步复制的超时时间
} multiState;

typedef struct multiCmd {
    robj **argv; // 参数
    int argc; // 参数个数
    struct redisCommand *cmd; // 命令
} multiCmd;

之前所述,服务端通过读回调函数读取客户端发送过来的命令,调用链为readQueryFromClient => processInputBufferAndReplicate => processInputBuffer => processCommand,这里主要看最后的一个函数。在事务状态下,命令会入队,并返回QUEUED给客户端。这里截取入队的代码:

int processCommand(client *c) {
	// ...
    
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) {
        // 事务状态下,若不是EXEC, DISCARD, MULTI, WATCH命令,则入队
        queueMultiCommand(c);
        // 返回QUEUED给客户端
        addReply(c,shared.queued);
    } else {
        // 其它情况,直接执行命令,包括普通命令和事务状态下EXEC, DISCARD, MULTI, WATCH命令
        call(c,CMD_CALL_FULL);
        // ...
    }
    return C_OK;
}

2.3. 事务提交

EXEC命令可用于提交事务。这里直接看该命令的处理函数execCommand

void execCommand(client *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
    int was_master = server.masterhost == NULL;
	// 若不在事务开启状态,直接返回错误
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
    // 检查:
    // 1. 是否有被WATCH的键被修改
    // 2. 是否在入队过程中(即一个命令入队前)出错
    // 若出现二者之一,直接终止
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        discardTransaction(c);
        goto handle_monitor;
    }
    
    // 若事务命令中包含写命令,但是自己有一个只读从节点,则也终止事务
    if (!server.loading && server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
    {
        addReplyError(c,
            "Transaction contains write commands but instance "
            "is now a read-only slave. EXEC aborted.");
        discardTransaction(c);
        goto handle_monitor;
    }

    // 开始执行队列中所有的命令
    // a) 取消WATCH的键,因为单线程模型,且通过WATCH检查,因此取消它们WATCH是安全的
    unwatchAllKeys(c);
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    // b) 执行队列中的每一条命令
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;
        // 若执行到第一条写命令时,首先给所有的从节点和AOF传播MULTI命令
        // 因为之前执行MULTI命令后,dirty不变大,因此该命令之前没有传播给从节点和AOF
        // 所以传播写命令之前,需要传播MULTI给从节点和AOF
        if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }
		// 执行命令,这里会把写命令传播给从节点和AOF
        call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);

        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    // 取消事务状态,表示本节点的事务已经完成
    discardTransaction(c); 
    // 事务执行完后,增加server.dirty,之后EXEC会由于该数值增加,传播到从节点和AOF上
    if (must_propagate) {
        int is_master = server.masterhost == NULL;
        server.dirty++;
        // ...
    }
handle_monitor:
    // ... 把事务状态发给监视器 ...
}

可见,当事务要执行,则需要:

  • WATCH的键不能被修改(否则会有CLIENT_DIRTY_CAS标识)
  • 入队过程中不能出错(否则会有CLIENT_DIRTY_EXEC标识)
  • 没有只读的从节点

关于主从复制,回顾一下,一般情况下:

  • server.dirty增大,则命令会被传播到从节点和AOF中(具体调用函数链processCommand => call => propagate

    void call(client *c, int flags) {
        // ...
        c->cmd->proc(c);
        duration = ustime()-start;
        dirty = server.dirty-dirty;
        // ...
        // flags是CMD_CALL_FULL,所以这里会进去
        if (flags & CMD_CALL_PROPAGATE &&
            (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
        {
            int propagate_flags = PROPAGATE_NONE;
            // 若修改了,则标记传播AOF和从节点,
            if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
    		// ...
            if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
                // 由于修改了,所以有PROPAGATE_XX标记,所以命令会被传播
                propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
        }
        // ...
    }
    
  • MULTI不会导致server.dirty变大,因此执行完该命令时,不会传播到从节点和AOF中。所以在执行EXEC时,遇到第一个写命令,需要先传播MULTI

  • call函数在执行完命令后,根据server.dirty是否增加,而决定是否传播命令给从节点和AOF:

    • MULTI命令不会更改server.dirty,所以不会传播(留在EXEC执行时传播)
    • 在事务上下文下,命令入队是不会更改server.dirty,所以不会传播(留在EXEC执行时传播)
    • 写命令会更改server.dirty,所以会传播
    • EXEC会更改server.dirty,所以也会传播
  • execCommand可以看出,在这个函数里,会统一传播MULTI和之后的写命令;而该函数返回后,会传播EXEC

另外,在Redis Cluster模式下,也支持所谓事务,但是一旦操作的数据不在该节点上,只会返回MOVED错误,事务不会回滚,只会继续执行下面的命令。因此,Redis并不支持集群级别/跨节点的事务

2.4. 事务终止

事务终止使用的是DISCARD命令,这里的代码就很简单了,只是清除了事务的状态和资源。

由于在MULTIDISCARD之前,没有对server.dirty做任何修改,所以从节点和AOF不会收到任何关于该事务的数据,也即没有任何的更改。

void discardCommand(client *c) {
    // 不在事务上下文中,则返回错误
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"DISCARD without MULTI");
        return;
    }
    // 清除事务状态
    discardTransaction(c);
    addReply(c,shared.ok);
}

3. 事务的ACID

3.1. 原子性

Redis事务的特点是:

  • 事务中的命令延迟到EXEC执行
  • 命令要么全不执行,要么全部执行
  • 若中间命令执行错误,下面的命令也会执行,不会回滚

因此,可以认为Redis事务是能一次性完成,但若事务执行时出错,不支持回滚,因此我认为它没有传统意义的原子性

3.2. 一致性

这里指事务执行前后,数据符合定义和要求(满足一定的约束),没有非法和无效的数据。

Redis通过错误检测的设计,保证事务的一致性:

  • 入队错误:事务会被拒绝执行,所以是”一致的”
  • 命令执行错误:错误的命令不会被执行(修改数据),所以是”一致的“,前提是需要客户端检测出错的命令
  • 宕机问题:
    • 若没配置持久化,则恢复出来是空数据库,因此是”一致的“
    • 若配置RDB,则恢复出来的是上一个一致的状态,虽然数据是旧的,但也是”一致的“;
    • 若配置AOF,若事务中的命令没有落入AOF,则恢复出来的是”一致的“;但事务中的命令落入了AOF,则需要利用工具回滚

总之,Redis是满足一定的事务一致性约束的,但是具体还得和具体场景结合起来分析其”一致性“。个人认为它并不具备一些关系数据库级别的一致性,因为它不支持回滚等特性。

如A向B转账,B向C转账的事务,需要满足:事务前后钱总量相同。

但第1步失败了,第2步还是会执行,因此上述的约束不满足,也达不到一致性。

另外,关于主从节点数据的一致性,事务的传播和复制是异步的,即事务提交并返回后,从节点可能并没有提交事务,因此其一致性也是很弱的(最终一致)。

3.3. 隔离性

Redis是单线程模型,因此多事务执行完全是串行执行的,即达到”串行化“隔离级别(Serializable)。

3.4. 持久性

这部分取决于Redis持久化的配置:

  • appendfsyncalways,能保证持久性;
  • 其它情况下不能保证持久性。

此外,事务的最后加上一条SAVE命令,也能保证持久性,但是效率很低,不推荐使用。

3.5. 事务特性总结

根据上面的分析,Redis事务对ACID的支持是非常弱的(除了串行化隔离),所以个人认为Redis事务的实现用武之处不大,若需要ACID的支持,更推荐数据库产品(如支持事务的关系型数据库),而非Redis缓存。

4. WATCH:乐观锁

WATCH命令可以监视一个(或多个) 键 。

若在事务执行之前(即EXEC执行命令前),这些键被其他命令所改动,那么事务将被终止,拒绝执行。

被监视的键会保存在客户端的一条链表(存watchedKey)里,如下所示:

typedef struct client {
    // ...
    list *watched_keys; // 监控的键的链表
    // ...
}

typedef struct watchedKey {
    robj *key; // 键
    redisDb *db; // 对应的数据库
} client;

此外,每个数据库会保存被监视的键,以字典保存,键是被监视的键,值是监视的客户端列表:

typedef struct redisDb {
    // ...
    dict *watched_keys; // key -> list of client
    // ...
} redisDb;

4.1. 添加WATCH

这部分在watchCommand里,它只能在事务上下文中执行。实际上它调用的是watchForKey函数来给一个键添加WATCH。该函数主要更新客户端的watched_keys链表和数据库的watched_keys字典:

void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;
    listRewind(c->watched_keys,&li);
    // 遍历客户端已监控的键,若出现重复,则直接返回
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    // 将客户端自己添加到对应数据库的watched_keys字典中
    clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    // 将待监控的键添加到客户端的watched_keys链表中
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

4.2. 监控键的修改

当某个键被修改时,会调用函数signalModifiedKey,实际上内部调用的是函数touchWatchedKey,它会给监视对应键的客户端添加CLIENT_DIRTY_cAS标识,当客户端要执行事务时,如2.3.所示,事务会被拒绝执行:

void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;
    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;
    // 给所有监视该键的客户端添加CLIENT_DIRTY_CAS标识
    // 当执行事务命令前,若有该标识,事务会拒绝执行
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags |= CLIENT_DIRTY_CAS;
    }
}

4.3. 乐观锁

乐观锁:假设各事务之间不会产生竞争,冲突的操作可以执行,当等待事务提交前,检查冲突是否存在;

悲观锁:假设各各事务之间会产生冲突,事务操作数据时,预先会为其加锁,防止其它事务修改,只有当锁释放,其它事务才能执行冲突的操作。

Redis的WATCH是基于乐观锁的实现,它不通过检查版本号检测冲突,而是通过检查监视的键是否被修改来检测冲突。若冲突,Redis会拒绝事务的执行。

其它数据库,如MySQL,会提供基于版本号的乐观控制,如MVCC,若出现了冲突/违反数据完整性,事务会被回滚。