源码阅读-Redis集群: 集群(2)

Posted by keys961 on November 28, 2019

1. Overview

Redis Cluster支持集群的伸缩,即扩容和收缩。而这部分功能,可以手动完成,也可以用redis-trib工具完成。本文主要讲这部分的原理。

2. 集群扩容

集群扩容主要分为3步:

  • 准备新节点
  • 加入集群
  • 迁移槽和数据

前2步之前的文章都说明了,可参考:

因此下面主要讨论的是槽和数据的迁移

2.0. Overview: 槽和数据的迁移

这里对单个槽的迁移(从source迁到target)步骤进行总结:

  • 对节点target发送CLUSTER SETSLOT <slot> IMPORTING <source>,标识节点target需要导入节点source的对应槽数据
  • 对节点source发送CLUSTER SETSLOT <slot> MIGRATING <target>,标识节点source需要导出对应槽数据导出到节点source
  • 对节点source发生发送CLUSTER GETKEYSINSLOT <slot> <count>,获取节点source上对应槽的键
  • 根据第3步的结果,向节点source发送MIGRATE命令,将这些键发送给节点target,若该槽的数据很多,则第3步和第4步会重复执行,直到迁移完成
  • 向集群任意节点发送CLUSTER SETSLOT <slot> NODE <target>,指派该槽给节点target,这个信息会被传播到整个集群

下面对每一步进行说明。

2.1. target:设置导入状态

这里,客户端对target节点发送下面的命令,给节点target设置槽slot设置为导入状态,导入的数据从source获取:

CLUSTER SETSLOT <slot> IMPORTING <source>

这里依旧进入clusterCommand函数,这里截取关键分支,这里主要把clusterState->importing_slots_from[slot]设为source

void clusterCommand(client *c) {
    // ...
	else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
        int slot;
        clusterNode *n;
		// 自己不能是从节点
        if (nodeIsSlave(myself)) {
            addReplyError(c,"Please use SETSLOT only with masters.");
            return;
        }
        // 检查槽是否在0~0x3FFF内
        if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
        // ...
        else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
            // 该槽不能是自己的,因为是从外面导入的
            if (server.cluster->slots[slot] == myself) {
                addReplyErrorFormat(c,
                    "I'm already the owner of hash slot %u",slot);
                return;
            }
            // 查找source节点是否存在,若不存在说明本节点不知道source,返回错误
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            // 把clusterState->importing_slots_from[slot]设为source
            server.cluster->importing_slots_from[slot] = n;
        } // ...
        // ...
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
        addReply(c,shared.ok);
    } 
    // ...
}

2.2. source:设置导出状态

第二步,客户端要向源节点source发送下面的命令,给节点source设置槽slot设置为导出状态,导出数据到target

CLUSTER SETSLOT <slot> MIGRATING <target>

这里还是clusterCommand函数的分支,主要是把clusterState->migrating_slots_to[slot]设为target

void clusterCommand(client *c) {
    // ...
	else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
        // ...
        if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
            // 自己(source)的数据导出到target节点,需要自己原先负责该槽
            if (server.cluster->slots[slot] != myself) {
                addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
                return;
            }
            // 若自己(source)不知道target节点,返回错误
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            // 把clusterState->migrating_slots_to[slot]设为target
            server.cluster->migrating_slots_to[slot] = n;
        } // ...
        // ...
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
        addReply(c,shared.ok);
    } 
    // ...
}

2.3. source:获取该槽需要迁移的键

接着要获取需要迁移的数据的键,这里调用下面的命令:

CLUSTER GETKEYSINSLOT <slot> <count>

下面是处理这个命令的代码,依旧是clusterCommand函数,这里主要做的是遍历并返回对应槽的键:

void clusterCommand(client *c) {
	// ...
    else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
        long long maxkeys, slot;
        unsigned int numkeys, j;
        robj **keys;
		// 获取并检查参数
        if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
            return;
        if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
            != C_OK)
            return;
        if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
            addReplyError(c,"Invalid slot or number of keys");
            return;
        }
        
        unsigned int keys_in_slot = countKeysInSlot(slot);
        if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;

        keys = zmalloc(sizeof(robj*)*maxkeys);
        // 获取maxkeys个数的键,填充到keys数组中
        numkeys = getKeysInSlot(slot, keys, maxkeys);
        // 响应结果: 键个数和所有键
        addReplyMultiBulkLen(c,numkeys);
        for (j = 0; j < numkeys; j++) {
            addReplyBulk(c,keys[j]);
            decrRefCount(keys[j]);
        }
        zfree(keys);
    } 
    // ...
}

而获取某个槽的键,调用函数getKeysInSlot,它会遍历clusterState->keys_to_slots这棵radix tree,并获取位于对应槽的键,而关于它的存储结构,可以参考前面的文章 源码阅读-Redis集群-集群(1)

unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
    raxIterator iter;
    int j = 0;
    unsigned char indexed[2];
	// 遍历slots_to_keys树
    // 而这颗树存储的是slot + key字符串,因此定位某个槽的键很快
    indexed[0] = (hashslot >> 8) & 0xff;
    indexed[1] = hashslot & 0xff;
    raxStart(&iter,server.cluster->slots_to_keys);
    raxSeek(&iter,">=",indexed,2);
    while(count-- && raxNext(&iter)) {
        if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
        keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
    }
    raxStop(&iter);
    return j;
}

2.4. 迁移槽中的数据

接下来就要准备数据迁移了。这里客户端需要向源节点source发送MIGRATE命令,根据2.3.的结果,将数据迁移到目标节点target,也即下面的命令:

MIGRATE host port "" dbid timeout [COPY | REPLACE] KEYS key1 key2 ... keyN # 批量
MIGRATE host port key dbid timeout [COPY | REPLACE] # 非批量

这里

  • 若标识COPY,表示不删除源节点source的数据;

  • 若标识REPLACE,表示会替换目标节点target已有的数据。

迁移数据主要分为5步:

  • source:解析和校验参数
  • source:发送迁移数据
  • target:接收数据并回复
  • source:处理目标节点的响应
  • 错误处理

a) source:解析和校验参数

这里主要在migrateCommand函数的开头处理,主要做:

  • 抽取参数,如目标节点地址、COPYREPLACE选项、要迁移的键集合等
  • 校验参数
  • 从键中获取值,准备迁移
void migrateCommand(client *c) {
    migrateCachedSocket *cs;
    int copy = 0, replace = 0, j;
    char *password = NULL;
    long timeout;
    long dbid;
    robj **ov = NULL; // 要迁移的记录值
    robj **kv = NULL; // 要迁移的记录键
    robj **newargv = NULL;
    rio cmd, payload;
    int may_retry = 1;
    int write_error = 0;
    int argv_rewritten = 0;

    int first_key = 3;
    int num_keys = 1;
    
	// 解析参数
    for (j = 6; j < c->argc; j++) {
        int moreargs = j < c->argc-1;
        if (!strcasecmp(c->argv[j]->ptr,"copy")) {
            // 获取COPY选项
            copy = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
            // 获取REPLACE选项
            replace = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
            // 获取AUTH选项
            if (!moreargs) {
                addReply(c,shared.syntaxerr);
                return;
            }
            j++;
            password = c->argv[j]->ptr;
        } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
            // 获取要迁移的键(批量)
            if (sdslen(c->argv[3]->ptr) != 0) {
                addReplyError(c,
                    "When using MIGRATE KEYS option, the key argument"
                    " must be set to the empty string");
                return;
            }
            first_key = j+1; // 第一个键的参数为止
            num_keys = c->argc - j - 1; // 键的个数
            break;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

    // 参数校验(timeout和dbid)
    if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
        getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK) {
        return;
    }
    if (timeout <= 0) timeout = 1000;

    ov = zrealloc(ov,sizeof(robj*)*num_keys);
    kv = zrealloc(kv,sizeof(robj*)*num_keys);
    int oi = 0;
	// 根据参数获取要迁移的数据
    for (j = 0; j < num_keys; j++) {
        if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
            kv[oi] = c->argv[first_key+j];
            oi++;
        }
    }
    num_keys = oi;
    if (num_keys == 0) {
        // 没有要迁移的,则返回+NOKEY响应
        zfree(ov); zfree(kv);
        addReplySds(c,sdsnew("+NOKEY\r\n"));
        return;
    }
    // ...
}

b) source:发送迁移数据

a)中,要传输的数据被记录到ovkv中,并获取到了目标节点地址,就可以准备数据并发送迁移的数据,这部分代码如下,它主要做:

  • 建立和节点target的连接,并创建输出缓冲区
  • 必要时写入AUTH命令
  • 必要时写入SELECT命令
  • 每个键写入一条RESTORE/RESTORE-ASKING指令
    • 过期数据会被跳过
    • 若是集群模式,写入RESTORE-ASKING命令;否则是RESTORE命令
    • 将键值序列化后,写入缓冲区,其格式和RDB文件很相似
    • 若配置了REPLACE,会将其追加到命令最后
  • 同步将缓冲区发送给目标节点target,每次发送64KB
void migrateCommand(client *c) {
    // ...
    try_again:
    write_error = 0;
    // 1. 创建和目标节点target的连接
    cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
    if (cs == NULL) {
        // 连接失败则返回
        zfree(ov); zfree(kv);
        return;
    }
	// 2. 创建cmd命令缓冲区
    rioInitWithBuffer(&cmd,sdsempty());
    // 3. 若需要发送AUTH命令,先将其写入cmd缓冲
    if (password) {
        serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
            sdslen(password)));
    }
    // 4. 判断是否需要发送SELECT命令,若需要则将SELECT命令写入cmd缓冲
    int select = cs->last_dbid != dbid;
    if (select) {
        serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
        serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
    }

    int non_expired = 0;

    // 5. 写入RESTORE命令(每个键1个),里面包含待迁移的数据
    for (j = 0; j < num_keys; j++) {
        long long ttl = 0;
        long long expireat = getExpire(c->db,kv[j]);
		// 判断键是否过期,若过期则直接跳过
        if (expireat != -1) {
            ttl = expireat-mstime();
            if (ttl < 0) {
                continue;
            }
            if (ttl < 1) ttl = 1;
        }
		// 这里键对应的数据没过期
        // 记录没过期的键到kv数组
        kv[non_expired++] = kv[j];
		// 写入RESTORE(普通)/RESTORE-ASKING(集群)写入cmd缓冲
        serverAssertWithInfo(c,NULL,
            rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); // 写入*开头
        if (server.cluster_enabled) // 写入RESTORE/RESTORE-ASKING
            serverAssertWithInfo(c,NULL,
                rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
        else
            serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
        serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); // 处理键
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
                sdslen(kv[j]->ptr))); // 写入键
        serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); // 写入TTL
        createDumpPayload(&payload,ov[j],kv[j]); // 序列化迁移的数据
        serverAssertWithInfo(c,NULL,
            rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                               sdslen(payload.io.buffer.ptr))); // 写入序列化的数据
        sdsfree(payload.io.buffer.ptr);
        if (replace) // 若配置了REPLACE,追加REPLACE到cmd缓冲
            serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
    }
    num_keys = non_expired;
	// 6. 将cmd缓冲的数据同步发给target节点,一次发64KB
    errno = 0;
    {
        sds buf = cmd.io.buffer.ptr;
        size_t pos = 0, towrite;
        int nwritten = 0;

        while ((towrite = sdslen(buf)-pos) > 0) {
            towrite = (towrite > (64*1024) ? (64*1024) : towrite);
            nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
            if (nwritten != (signed)towrite) {
                write_error = 1;
                goto socket_err; // 发送错误由socket_err例程处理,后面会说明
            }
            pos += nwritten;
        }
    }
    // ..
}

注意这里有2点要说明的。

首先是连接创建,这里对连接做了缓存,保存在server.migrate_cached_sockets字典中。创建连接时首先从这个字典中查找可用的连接,若没有才会去创建:

typedef struct migrateCachedSocket {
    int fd; // socket文件描述符
    long last_dbid; // 上一次交互的dbid
    time_t last_use_time; // 上一次使用的时间
} migrateCachedSocket;

另外是RESTORERESTORE-ASKING命令的区别。从上面可知,集群模式下会选择后者。而从命令表上,这两个命令实际处理上并没有太大差别,不过后者命令标识多了一个k,它会隐式执行一次ASKING命令:

{"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
{"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
{"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0},
void askingCommand(client *c) {
    if (server.cluster_enabled == 0) {
        addReplyError(c,"This instance has cluster support disabled");
        return;
    }
    // 集群模式下标记CLLIENT_ASKING,接收命令时,该槽被标记为importing才会被接收
    c->flags |= CLIENT_ASKING; 
    addReply(c,shared.ok);
}

c) target:接收迁移的数据

这步主要还是对于RESTORE/RESTORE-ASKING的处理,对应处理的函数是restoreCommand,也比较简单。它主要做:

  • 获取选项,如REPLACE
  • 获取并检查TTL
  • 检查发送过来的数据(类似RDB模式)
  • 将数据反序列化获得键和值
  • 写入数据库,必要时设置过期,若配置REPLACE则需要先删除旧数据
  • 响应+OK\r\n
void restoreCommand(client *c) {
    long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
    rio payload;
    int j, type, replace = 0, absttl = 0;
    robj *obj;

    // 解析选项,比如REPLACE
    for (j = 4; j < c->argc; j++) {
        int additional = c->argc-j-1;
        if (!strcasecmp(c->argv[j]->ptr,"replace")) {
            replace = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
            absttl = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
                   lfu_freq == -1) {
            if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
                    != C_OK) return;
            if (lru_idle < 0) {
                addReplyError(c,"Invalid IDLETIME value, must be >= 0");
                return;
            }
            lru_clock = LRU_CLOCK();
            j++;
        } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
                   lru_idle == -1) {
            if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
                    != C_OK) return;
            if (lfu_freq < 0 || lfu_freq > 255) {
                addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
                return;
            }
            j++; /* Consume additional arg. */
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
    // 若没配置REPLACE,但是本机上该键存在,则需要返回错误
    if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
        addReply(c,shared.busykeyerr);
        return;
    }
    // 获取并检查TTL是否小于0,若小于0则返回错误
    if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
        return;
    } else if (ttl < 0) {
        addReplyError(c,"Invalid TTL value, must be >= 0");
        return;
    }
	// 校验数据(版本和校验和),失败则返回错误
    if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR) {
        addReplyError(c,"DUMP payload version or checksum are wrong");
        return;
    }
	// 初始化缓冲区payload
    rioInitWithBuffer(&payload,c->argv[3]->ptr);
    // 将数据写入payload,并反序列化,若出错则返回错误
    if (((type = rdbLoadObjectType(&payload)) == -1) ||
        ((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL)) {
        addReplyError(c,"Bad data format");
        return;
    }
    // 若配置REPLACE,先删除原有数据
    if (replace) dbDelete(c->db,c->argv[1]);
    // 将迁移的数据写入db,必要时设置expire
    dbAdd(c->db,c->argv[1],obj);
    if (ttl) {
        if (!absttl) ttl+=mstime();
        setExpire(c,c->db,c->argv[1],ttl);
    }
    objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
    signalModifiedKey(c->db,c->argv[1]);
    // 返回+OK
    addReply(c,shared.ok);
    server.dirty++;
}

d) source:处理目标节点的响应

这里回到migrateCommand。发送好RESTORE/RESTORE-ASKING后,准备读取目标节点的响应,这一步是同步的。这里主要:

  • 同步读取响应
  • 若没配置COPY,则删除本节点对应的数据
void migrateCommand(client *c) {
    // ...
    char buf0[1024]; /* Auth reply. */
    char buf1[1024]; /* Select reply. */
    char buf2[1024]; /* Restore reply. */

    // 接收AUTH响应,若超时则跳转socket_err处理
    if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
        goto socket_err;
    // 接收SELECT响应,若超时则跳转socket_err处理
    if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
        goto socket_err;
    
    int error_from_target = 0; // 记录RESTORE响应内容是否是一个错误内容
    int socket_error = 0; // 记录读取RESTORE响应是否失败
    int del_idx = 1;
    // 当没配置COPY时,创建newargs数组,用于记录本节点被删除的键
    // 而数组第一项是DEL,因为没配置COPY下会删除本节点的键值对
    // 因此newargs可以作为一条命令,当删除完成后,将客户端的当前命令设置成newargv
    // 这样就可以将该命令传播到AOF和从节点上
    if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
	// 读取RESTORE响应,响应有num_keys个
    for (j = 0; j < num_keys; j++) {
        // 读取每个RESTORE响应
        if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
            // 若读取错误,标记socket_error
            socket_error = 1;
            break;
        }
        if ((password && buf0[0] == '-') ||
            (select && buf1[0] == '-') ||
            buf2[0] == '-') {
            // 这里,target响应了一个错误的内容,标记error_from_target
            if (!error_from_target) {
                cs->last_dbid = -1;
                char *errbuf;
                if (password && buf0[0] == '-') errbuf = buf0;
                else if (select && buf1[0] == '-') errbuf = buf1;
                else errbuf = buf2;

                error_from_target = 1;
                addReplyErrorFormat(c,"Target instance replied with error: %s",
                    errbuf+1);
            }
        } else {
            // 这里target响应了正确内容
            if (!copy) {
                // 没配置COPY,删除对应键值对
                dbDelete(c->db,kv[j]);
                signalModifiedKey(c->db,kv[j]);
                server.dirty++;
                // 将已删除的键记录到newargv中
                newargv[del_idx++] = kv[j];
                incrRefCount(kv[j]);
            }
        }
    }
    // ...
}

注意这里有一个newargv数组:

  • 当没配置COPY时,创建newargs数组,用于记录本节点被删除的键
  • 而数组第一项是DEL,因为没配置COPY下会删除本节点的键值对,因此newargs可以作为一条命令,当删除完成后,将客户端的当前执行命令替换成newargv,这样就可以将该命令传播到AOF和从节点上

e) source:错误处理

最后就是收尾的工作,这里主要做的是:

  • 错误处理:有的直接返回(比如无法挽回的错误),有的会重试
  • 若没配置COPY,且删除了键,将newargv替换成客户端的当前命令,用于传播到AOF和从节点
  • 若正常,响应+OK\r\n
void migrateCommand(client *c) {
    // ...
    // 套接字错误,且第一个键就错了,may_retry=1(没重试过),进入socket_err准备重试
    if (!error_from_target && socket_error && j == 0 && may_retry &&
        errno != ETIMEDOUT) {
        goto socket_err;
    }

    // 上面的条件不符合情况下(如中间过程中套接字出错,或已重试),则先关闭连接
    if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
	// 当没配置COPY下
    // 若删除了键,将newargv的DEL命令替换成客户端当前命令,用于AOF和主从复制
    if (!copy) {
        if (del_idx > 1) {
            newargv[0] = createStringObject("DEL",3);
            replaceClientCommandVector(c,del_idx,newargv);
            argv_rewritten = 1;
        } else {
            zfree(newargv);
        }
        newargv = NULL;
    }

    // 执行到这里,如果响应没出错,套接字出错,进入socket_err例程并不重试
    // 这里就是上述的如中间过程套接字出错,或者已重试的情况基础上,没有响应内容错误
    // 到这一步,肯定已经关闭了和target的连接
    if (!error_from_target && socket_error) {
        may_retry = 0;
        goto socket_err;
    }
	// 到这一步,只可能是响应内容错,而套接字没出错
    if (!error_from_target) {
        // 这里响应正确,则返回客户端+OK,表示迁移成功
        cs->last_dbid = dbid;
        addReply(c,shared.ok);
    } else {
        // 这里,仅响应内容错误,这部分已经在d)中处理了(34-44行)
        // 不需要做任何东西
    }
	// 释放空间并返回
    sdsfree(cmd.io.buffer.ptr);
    zfree(ov); zfree(kv); zfree(newargv);
    return;

// socket_err套接字错误处理例程
socket_err:
    // 清空输出缓冲,可能用于重试
    sdsfree(cmd.io.buffer.ptr);
    // 如果没有重写client参数列表,关闭连接,因为要保持一致性
    if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
    // 释放newargv
    zfree(newargv);
    newargv = NULL; 
    
    // 若套接字是连接超时,则不会重试
    // 否则当may_retry开启时,重试MIGRATE,并消除重试标记
    if (errno != ETIMEDOUT && may_retry) {
        may_retry = 0;
        goto try_again;
    }
    // 到这里就是不能重试的I/O套接字错误,返回-IOERR错误给客户端
    zfree(ov); zfree(kv);
    addReplySds(c,
        sdscatprintf(sdsempty(),
            "-IOERR error or timeout %s to target instance\r\n",
            write_error ? "writing" : "reading"));
    return;
    // ...
}

这里,错误有2个:

  • 套接字错误,由socket_error标识
  • 响应内容错误,由error_from_target标识

因此总结一下错误的处理:

  • 若套接字错误
    • 若是第一次遇到该错误,错误不是连接超时,且没出现响应错误,且没有迁移任何的数据,则重试
    • 不符合上面条件的:
      • 先关闭与target的连接
      • 若没有响应内容错误,则返回-IOERR,不重试
  • 若响应内容错误,无论如何都不会重试,而会返回Target instance replied with error错误
  • 若没有错误,返回+OK\r\n

  • 不论是否发生错误,只要在没配置COPY并且删除了数据,则用newargv设置为客户端当前命令,以写AOF和主从复制

2.5. 槽位迁移

到这一步,数据已经迁移完成,现在需要分配槽位给新节点,从旧节点移除槽位。这里客户端会发送下面的命令:

CLUSTER SETSLOT <slot> NODE <node_name>

虽然这个命令可以发到任何一个节点,因为通过消息传播可以将上面的更改同步到其它节点,但是redis-trib工具会把该命令发送给集群的每一个主节点,以加快更改的同步进度。

这里对于该命令的处理还是在clusterCommand里,下面是代码片段,主要用于重分配槽的所属节点,并必要时清除migratingimporting标记:

void clusterCommand(client *c) {
	// ...
    else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
    	// ...
         else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
            // 根据节点名,查找节点
            clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
			// 若节点未知则返回错误
            if (!n) {
                addReplyErrorFormat(c,"Unknown node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            // 不允许槽属于自己,且槽有数据的情况下,将其移交给别的节点
            if (server.cluster->slots[slot] == myself && n != myself) {
                if (countKeysInSlot(slot) != 0) {
                    addReplyErrorFormat(c,
                        "Can't assign hashslot %d to a different node "
                        "while I still hold keys for this hash slot.", slot);
                    return;
                }
            }
            // 这里针对source
            // 该槽为空,且有标记为migrating,消除标记
            if (countKeysInSlot(slot) == 0 &&
                server.cluster->migrating_slots_to[slot])
                server.cluster->migrating_slots_to[slot] = NULL;

            // 这里针对target
            // 这里请求中的节点是自己 ,且槽标记了importing
            // 则消除标记,且将configEpoch增加,以解决configEpoch冲突的问题
            if (n == myself &&
                server.cluster->importing_slots_from[slot])
            {
                if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
                    serverLog(LL_WARNING,
                        "configEpoch updated after importing slot %d", slot);
                }
                server.cluster->importing_slots_from[slot] = NULL;
            }
            // 清除该槽的所有者
            clusterDelSlot(slot);
            // 将该槽分配给请求中的节点
            clusterAddSlot(n,slot);
        // ...
    }
    // ...
}

这里更新完后,返回+OK,然后变更会传播和同步到所有节点。至此,集群的扩容就完成了。

3. 集群收缩

集群缩容主要步骤是:

  • 将下线节点node的数据和槽导入到其它节点
  • 向其他所有节点发送CLUSTER FORGET <node>,让其它节点直到节点node的下线
  • 关闭节点node

第一步的原理(即数据迁移),在第2节已经说明了;第三步很简单,不需说明。因此这里只说明第二步的原理,即命令CLUSTER FORGET <node>

这里依旧是clusterCommand的分支片段,截取如下,主要是这些:

  • 检查请求的节点,不能移除下面几类节点
    • 未知节点
    • 自己
    • 自己的主节点(若自己是从节点)
  • 将节点加入黑名单
  • 将节点从clusterState移除
void clusterCommand(client *c) {
	// ...
    else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
		// 1. 寻找请求中的节点实例
        clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
        if (!n) {
            // 若找不到则返回错误
            addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
            return;
        } else if (n == myself) {
            // 不能forget自己
            addReplyError(c,"I tried hard but I can't forget myself...");
            return;
        } else if (nodeIsSlave(myself) && myself->slaveof == n) {
            // 若自己是从节点,则不能forget自己的主节点
            addReplyError(c,"Can't forget my master!");
            return;
        }
        // 2. 将待忘记的节点加入黑名单
        clusterBlacklistAddNode(n);
        // 3. 从自己的clusterState移除该节点
        clusterDelNode(n);
        clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
                             CLUSTER_TODO_SAVE_CONFIG);
        addReply(c,shared.ok);
    } // ...
    // ...
}

这里关键的是加入黑名单这一步。加入黑名单的原因防止被遗忘的节点再次加入集群,导致重新上线

考虑下面的例子:现有节点A, B, C,现要忘掉C

  • 先给A发送CLUSTER FORGET C移除节点C
  • 此时B的成员表中还有C,因此节点间通信时,Gossip中依旧会有C的信息
  • A收到B的Gossip,里面有C,若不过滤黑名单,那么C依旧会被加入到A的成员表中,导致遗忘失败

上面的例子就是节点要加入黑名单的原因。而黑名单的过滤也在下面代码中有所体现:

void clusterProcessGossipSection {
    // ...
    if(node) {
        // ...
    } else {
        // ...
        // 当Gossip内容部分中的node本节点不知道
        // 则需要对其握手,以将其加入集群(本节点视角下)
        if (sender &&
                !(flags & CLUSTER_NODE_NOADDR) &&
            	// 这里过滤了黑名单,因此杜绝了上面问题的出现
                !clusterBlacklistExists(g->nodename)) {
         	clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
    	}
        // ...
    }
    // ...
}

黑名单的有效期只有60s,也就是说,让其他节点忘记节点的有效期只有60s。

#define CLUSTER_BLACKLIST_TTL 60      /* 1 minute. */

另外CLUSTER FORGET <node>命令需要发给其它所有节点,因为该命令只会移除本节点视角下的节点node,这个消息并不会通过消息传播到其它节点,即其它节点(没收到该命令)视角下依旧会有节点node,即使节点node实际上不可用了(这种情况下,节点node会被标记FAIL,并传播给其它节点,包括遗忘的节点,导致遗忘失败)。