1. Overview
Redis Cluster支持集群的伸缩,即扩容和收缩。而这部分功能,可以手动完成,也可以用redis-trib
工具完成。本文主要讲这部分的原理。
2. 集群扩容
集群扩容主要分为3步:
- 准备新节点
- 加入集群
- 迁移槽和数据
前2步之前的文章都说明了,可参考:
因此下面主要讨论的是槽和数据的迁移。
2.0. Overview: 槽和数据的迁移
这里对单个槽的迁移(从source
迁到target
)步骤进行总结:
- 对节点
target
发送CLUSTER SETSLOT <slot> IMPORTING <source>
,标识节点target
需要导入节点source
的对应槽数据 - 对节点
source
发送CLUSTER SETSLOT <slot> MIGRATING <target>
,标识节点source
需要导出对应槽数据导出到节点source
- 对节点
source
发生发送CLUSTER GETKEYSINSLOT <slot> <count>
,获取节点source
上对应槽的键 - 根据第3步的结果,向节点
source
发送MIGRATE
命令,将这些键发送给节点target
,若该槽的数据很多,则第3步和第4步会重复执行,直到迁移完成 - 向集群任意节点发送
CLUSTER SETSLOT <slot> NODE <target>
,指派该槽给节点target
,这个信息会被传播到整个集群
下面对每一步进行说明。
2.1. target
:设置导入状态
这里,客户端对target
节点发送下面的命令,给节点target
设置槽slot
设置为导入状态,导入的数据从source
获取:
CLUSTER SETSLOT <slot> IMPORTING <source>
这里依旧进入clusterCommand
函数,这里截取关键分支,这里主要把clusterState->importing_slots_from[slot]
设为source
:
void clusterCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
int slot;
clusterNode *n;
// 自己不能是从节点
if (nodeIsSlave(myself)) {
addReplyError(c,"Please use SETSLOT only with masters.");
return;
}
// 检查槽是否在0~0x3FFF内
if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
// ...
else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
// 该槽不能是自己的,因为是从外面导入的
if (server.cluster->slots[slot] == myself) {
addReplyErrorFormat(c,
"I'm already the owner of hash slot %u",slot);
return;
}
// 查找source节点是否存在,若不存在说明本节点不知道source,返回错误
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
// 把clusterState->importing_slots_from[slot]设为source
server.cluster->importing_slots_from[slot] = n;
} // ...
// ...
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
addReply(c,shared.ok);
}
// ...
}
2.2. source
:设置导出状态
第二步,客户端要向源节点source
发送下面的命令,给节点source
设置槽slot
设置为导出状态,导出数据到target
:
CLUSTER SETSLOT <slot> MIGRATING <target>
这里还是clusterCommand
函数的分支,主要是把clusterState->migrating_slots_to[slot]
设为target
:
void clusterCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
// ...
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
// 自己(source)的数据导出到target节点,需要自己原先负责该槽
if (server.cluster->slots[slot] != myself) {
addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
return;
}
// 若自己(source)不知道target节点,返回错误
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
// 把clusterState->migrating_slots_to[slot]设为target
server.cluster->migrating_slots_to[slot] = n;
} // ...
// ...
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
addReply(c,shared.ok);
}
// ...
}
2.3. source
:获取该槽需要迁移的键
接着要获取需要迁移的数据的键,这里调用下面的命令:
CLUSTER GETKEYSINSLOT <slot> <count>
下面是处理这个命令的代码,依旧是clusterCommand
函数,这里主要做的是遍历并返回对应槽的键:
void clusterCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
long long maxkeys, slot;
unsigned int numkeys, j;
robj **keys;
// 获取并检查参数
if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
return;
if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
!= C_OK)
return;
if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
addReplyError(c,"Invalid slot or number of keys");
return;
}
unsigned int keys_in_slot = countKeysInSlot(slot);
if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
keys = zmalloc(sizeof(robj*)*maxkeys);
// 获取maxkeys个数的键,填充到keys数组中
numkeys = getKeysInSlot(slot, keys, maxkeys);
// 响应结果: 键个数和所有键
addReplyMultiBulkLen(c,numkeys);
for (j = 0; j < numkeys; j++) {
addReplyBulk(c,keys[j]);
decrRefCount(keys[j]);
}
zfree(keys);
}
// ...
}
而获取某个槽的键,调用函数getKeysInSlot
,它会遍历clusterState->keys_to_slots
这棵radix tree,并获取位于对应槽的键,而关于它的存储结构,可以参考前面的文章 源码阅读-Redis集群-集群(1):
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
raxIterator iter;
int j = 0;
unsigned char indexed[2];
// 遍历slots_to_keys树
// 而这颗树存储的是slot + key字符串,因此定位某个槽的键很快
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
raxStart(&iter,server.cluster->slots_to_keys);
raxSeek(&iter,">=",indexed,2);
while(count-- && raxNext(&iter)) {
if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
}
raxStop(&iter);
return j;
}
2.4. 迁移槽中的数据
接下来就要准备数据迁移了。这里客户端需要向源节点source
发送MIGRATE
命令,根据2.3.的结果,将数据迁移到目标节点target
,也即下面的命令:
MIGRATE host port "" dbid timeout [COPY | REPLACE] KEYS key1 key2 ... keyN # 批量
MIGRATE host port key dbid timeout [COPY | REPLACE] # 非批量
这里
若标识
COPY
,表示不删除源节点source
的数据;若标识
REPLACE
,表示会替换目标节点target
已有的数据。
迁移数据主要分为5步:
source
:解析和校验参数source
:发送迁移数据target
:接收数据并回复source
:处理目标节点的响应- 错误处理
a) source
:解析和校验参数
这里主要在migrateCommand
函数的开头处理,主要做:
- 抽取参数,如目标节点地址、
COPY
和REPLACE
选项、要迁移的键集合等 - 校验参数
- 从键中获取值,准备迁移
void migrateCommand(client *c) {
migrateCachedSocket *cs;
int copy = 0, replace = 0, j;
char *password = NULL;
long timeout;
long dbid;
robj **ov = NULL; // 要迁移的记录值
robj **kv = NULL; // 要迁移的记录键
robj **newargv = NULL;
rio cmd, payload;
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;
int first_key = 3;
int num_keys = 1;
// 解析参数
for (j = 6; j < c->argc; j++) {
int moreargs = j < c->argc-1;
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
// 获取COPY选项
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
// 获取REPLACE选项
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
// 获取AUTH选项
if (!moreargs) {
addReply(c,shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
// 获取要迁移的键(批量)
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
}
first_key = j+1; // 第一个键的参数为止
num_keys = c->argc - j - 1; // 键的个数
break;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
// 参数校验(timeout和dbid)
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK) {
return;
}
if (timeout <= 0) timeout = 1000;
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
int oi = 0;
// 根据参数获取要迁移的数据
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
kv[oi] = c->argv[first_key+j];
oi++;
}
}
num_keys = oi;
if (num_keys == 0) {
// 没有要迁移的,则返回+NOKEY响应
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
// ...
}
b) source
:发送迁移数据
a)中,要传输的数据被记录到ov
和kv
中,并获取到了目标节点地址,就可以准备数据并发送迁移的数据,这部分代码如下,它主要做:
- 建立和节点
target
的连接,并创建输出缓冲区 - 必要时写入
AUTH
命令 - 必要时写入
SELECT
命令 - 每个键写入一条
RESTORE/RESTORE-ASKING
指令- 过期数据会被跳过
- 若是集群模式,写入
RESTORE-ASKING
命令;否则是RESTORE
命令 - 将键值序列化后,写入缓冲区,其格式和RDB文件很相似
- 若配置了
REPLACE
,会将其追加到命令最后
- 同步将缓冲区发送给目标节点
target
,每次发送64KB
void migrateCommand(client *c) {
// ...
try_again:
write_error = 0;
// 1. 创建和目标节点target的连接
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
// 连接失败则返回
zfree(ov); zfree(kv);
return;
}
// 2. 创建cmd命令缓冲区
rioInitWithBuffer(&cmd,sdsempty());
// 3. 若需要发送AUTH命令,先将其写入cmd缓冲
if (password) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
sdslen(password)));
}
// 4. 判断是否需要发送SELECT命令,若需要则将SELECT命令写入cmd缓冲
int select = cs->last_dbid != dbid;
if (select) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}
int non_expired = 0;
// 5. 写入RESTORE命令(每个键1个),里面包含待迁移的数据
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
// 判断键是否过期,若过期则直接跳过
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
// 这里键对应的数据没过期
// 记录没过期的键到kv数组
kv[non_expired++] = kv[j];
// 写入RESTORE(普通)/RESTORE-ASKING(集群)写入cmd缓冲
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); // 写入*开头
if (server.cluster_enabled) // 写入RESTORE/RESTORE-ASKING
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); // 处理键
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr))); // 写入键
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); // 写入TTL
createDumpPayload(&payload,ov[j],kv[j]); // 序列化迁移的数据
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr))); // 写入序列化的数据
sdsfree(payload.io.buffer.ptr);
if (replace) // 若配置了REPLACE,追加REPLACE到cmd缓冲
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
}
num_keys = non_expired;
// 6. 将cmd缓冲的数据同步发给target节点,一次发64KB
errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err; // 发送错误由socket_err例程处理,后面会说明
}
pos += nwritten;
}
}
// ..
}
注意这里有2点要说明的。
首先是连接创建,这里对连接做了缓存,保存在server.migrate_cached_sockets
字典中。创建连接时首先从这个字典中查找可用的连接,若没有才会去创建:
typedef struct migrateCachedSocket {
int fd; // socket文件描述符
long last_dbid; // 上一次交互的dbid
time_t last_use_time; // 上一次使用的时间
} migrateCachedSocket;
另外是RESTORE
和RESTORE-ASKING
命令的区别。从上面可知,集群模式下会选择后者。而从命令表上,这两个命令实际处理上并没有太大差别,不过后者命令标识多了一个k
,它会隐式执行一次ASKING
命令:
{"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
{"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
{"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0},
void askingCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
// 集群模式下标记CLLIENT_ASKING,接收命令时,该槽被标记为importing才会被接收
c->flags |= CLIENT_ASKING;
addReply(c,shared.ok);
}
c) target
:接收迁移的数据
这步主要还是对于RESTORE/RESTORE-ASKING
的处理,对应处理的函数是restoreCommand
,也比较简单。它主要做:
- 获取选项,如
REPLACE
- 获取并检查TTL
- 检查发送过来的数据(类似RDB模式)
- 将数据反序列化获得键和值
- 写入数据库,必要时设置过期,若配置
REPLACE
则需要先删除旧数据 - 响应
+OK\r\n
void restoreCommand(client *c) {
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
rio payload;
int j, type, replace = 0, absttl = 0;
robj *obj;
// 解析选项,比如REPLACE
for (j = 4; j < c->argc; j++) {
int additional = c->argc-j-1;
if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
absttl = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
lfu_freq == -1) {
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
!= C_OK) return;
if (lru_idle < 0) {
addReplyError(c,"Invalid IDLETIME value, must be >= 0");
return;
}
lru_clock = LRU_CLOCK();
j++;
} else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
lru_idle == -1) {
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
!= C_OK) return;
if (lfu_freq < 0 || lfu_freq > 255) {
addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
return;
}
j++; /* Consume additional arg. */
} else {
addReply(c,shared.syntaxerr);
return;
}
}
// 若没配置REPLACE,但是本机上该键存在,则需要返回错误
if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
addReply(c,shared.busykeyerr);
return;
}
// 获取并检查TTL是否小于0,若小于0则返回错误
if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
return;
} else if (ttl < 0) {
addReplyError(c,"Invalid TTL value, must be >= 0");
return;
}
// 校验数据(版本和校验和),失败则返回错误
if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR) {
addReplyError(c,"DUMP payload version or checksum are wrong");
return;
}
// 初始化缓冲区payload
rioInitWithBuffer(&payload,c->argv[3]->ptr);
// 将数据写入payload,并反序列化,若出错则返回错误
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL)) {
addReplyError(c,"Bad data format");
return;
}
// 若配置REPLACE,先删除原有数据
if (replace) dbDelete(c->db,c->argv[1]);
// 将迁移的数据写入db,必要时设置expire
dbAdd(c->db,c->argv[1],obj);
if (ttl) {
if (!absttl) ttl+=mstime();
setExpire(c,c->db,c->argv[1],ttl);
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
signalModifiedKey(c->db,c->argv[1]);
// 返回+OK
addReply(c,shared.ok);
server.dirty++;
}
d) source
:处理目标节点的响应
这里回到migrateCommand
。发送好RESTORE/RESTORE-ASKING
后,准备读取目标节点的响应,这一步是同步的。这里主要:
- 同步读取响应
- 若没配置
COPY
,则删除本节点对应的数据
void migrateCommand(client *c) {
// ...
char buf0[1024]; /* Auth reply. */
char buf1[1024]; /* Select reply. */
char buf2[1024]; /* Restore reply. */
// 接收AUTH响应,若超时则跳转socket_err处理
if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
goto socket_err;
// 接收SELECT响应,若超时则跳转socket_err处理
if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_err;
int error_from_target = 0; // 记录RESTORE响应内容是否是一个错误内容
int socket_error = 0; // 记录读取RESTORE响应是否失败
int del_idx = 1;
// 当没配置COPY时,创建newargs数组,用于记录本节点被删除的键
// 而数组第一项是DEL,因为没配置COPY下会删除本节点的键值对
// 因此newargs可以作为一条命令,当删除完成后,将客户端的当前命令设置成newargv
// 这样就可以将该命令传播到AOF和从节点上
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
// 读取RESTORE响应,响应有num_keys个
for (j = 0; j < num_keys; j++) {
// 读取每个RESTORE响应
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
// 若读取错误,标记socket_error
socket_error = 1;
break;
}
if ((password && buf0[0] == '-') ||
(select && buf1[0] == '-') ||
buf2[0] == '-') {
// 这里,target响应了一个错误的内容,标记error_from_target
if (!error_from_target) {
cs->last_dbid = -1;
char *errbuf;
if (password && buf0[0] == '-') errbuf = buf0;
else if (select && buf1[0] == '-') errbuf = buf1;
else errbuf = buf2;
error_from_target = 1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
errbuf+1);
}
} else {
// 这里target响应了正确内容
if (!copy) {
// 没配置COPY,删除对应键值对
dbDelete(c->db,kv[j]);
signalModifiedKey(c->db,kv[j]);
server.dirty++;
// 将已删除的键记录到newargv中
newargv[del_idx++] = kv[j];
incrRefCount(kv[j]);
}
}
}
// ...
}
注意这里有一个
newargv
数组:
- 当没配置
COPY
时,创建newargs
数组,用于记录本节点被删除的键- 而数组第一项是
DEL
,因为没配置COPY
下会删除本节点的键值对,因此newargs
可以作为一条命令,当删除完成后,将客户端的当前执行命令替换成newargv
,这样就可以将该命令传播到AOF和从节点上
e) source
:错误处理
最后就是收尾的工作,这里主要做的是:
- 错误处理:有的直接返回(比如无法挽回的错误),有的会重试
- 若没配置
COPY
,且删除了键,将newargv
替换成客户端的当前命令,用于传播到AOF和从节点 - 若正常,响应
+OK\r\n
void migrateCommand(client *c) {
// ...
// 套接字错误,且第一个键就错了,may_retry=1(没重试过),进入socket_err准备重试
if (!error_from_target && socket_error && j == 0 && may_retry &&
errno != ETIMEDOUT) {
goto socket_err;
}
// 上面的条件不符合情况下(如中间过程中套接字出错,或已重试),则先关闭连接
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
// 当没配置COPY下
// 若删除了键,将newargv的DEL命令替换成客户端当前命令,用于AOF和主从复制
if (!copy) {
if (del_idx > 1) {
newargv[0] = createStringObject("DEL",3);
replaceClientCommandVector(c,del_idx,newargv);
argv_rewritten = 1;
} else {
zfree(newargv);
}
newargv = NULL;
}
// 执行到这里,如果响应没出错,套接字出错,进入socket_err例程并不重试
// 这里就是上述的如中间过程套接字出错,或者已重试的情况基础上,没有响应内容错误
// 到这一步,肯定已经关闭了和target的连接
if (!error_from_target && socket_error) {
may_retry = 0;
goto socket_err;
}
// 到这一步,只可能是响应内容错,而套接字没出错
if (!error_from_target) {
// 这里响应正确,则返回客户端+OK,表示迁移成功
cs->last_dbid = dbid;
addReply(c,shared.ok);
} else {
// 这里,仅响应内容错误,这部分已经在d)中处理了(34-44行)
// 不需要做任何东西
}
// 释放空间并返回
sdsfree(cmd.io.buffer.ptr);
zfree(ov); zfree(kv); zfree(newargv);
return;
// socket_err套接字错误处理例程
socket_err:
// 清空输出缓冲,可能用于重试
sdsfree(cmd.io.buffer.ptr);
// 如果没有重写client参数列表,关闭连接,因为要保持一致性
if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
// 释放newargv
zfree(newargv);
newargv = NULL;
// 若套接字是连接超时,则不会重试
// 否则当may_retry开启时,重试MIGRATE,并消除重试标记
if (errno != ETIMEDOUT && may_retry) {
may_retry = 0;
goto try_again;
}
// 到这里就是不能重试的I/O套接字错误,返回-IOERR错误给客户端
zfree(ov); zfree(kv);
addReplySds(c,
sdscatprintf(sdsempty(),
"-IOERR error or timeout %s to target instance\r\n",
write_error ? "writing" : "reading"));
return;
// ...
}
这里,错误有2个:
- 套接字错误,由
socket_error
标识 - 响应内容错误,由
error_from_target
标识
因此总结一下错误的处理:
- 若套接字错误
- 若是第一次遇到该错误,错误不是连接超时,且没出现响应错误,且没有迁移任何的数据,则重试
- 不符合上面条件的:
- 先关闭与
target
的连接 - 若没有响应内容错误,则返回
-IOERR
,不重试
- 先关闭与
- 若响应内容错误,无论如何都不会重试,而会返回
Target instance replied with error
错误 -
若没有错误,返回
+OK\r\n
- 不论是否发生错误,只要在没配置
COPY
并且删除了数据,则用newargv
设置为客户端当前命令,以写AOF和主从复制
2.5. 槽位迁移
到这一步,数据已经迁移完成,现在需要分配槽位给新节点,从旧节点移除槽位。这里客户端会发送下面的命令:
CLUSTER SETSLOT <slot> NODE <node_name>
虽然这个命令可以发到任何一个节点,因为通过消息传播可以将上面的更改同步到其它节点,但是redis-trib
工具会把该命令发送给集群的每一个主节点,以加快更改的同步进度。
这里对于该命令的处理还是在clusterCommand
里,下面是代码片段,主要用于重分配槽的所属节点,并必要时清除migrating
和importing
标记:
void clusterCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
// ...
else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
// 根据节点名,查找节点
clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
// 若节点未知则返回错误
if (!n) {
addReplyErrorFormat(c,"Unknown node %s",
(char*)c->argv[4]->ptr);
return;
}
// 不允许槽属于自己,且槽有数据的情况下,将其移交给别的节点
if (server.cluster->slots[slot] == myself && n != myself) {
if (countKeysInSlot(slot) != 0) {
addReplyErrorFormat(c,
"Can't assign hashslot %d to a different node "
"while I still hold keys for this hash slot.", slot);
return;
}
}
// 这里针对source
// 该槽为空,且有标记为migrating,消除标记
if (countKeysInSlot(slot) == 0 &&
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;
// 这里针对target
// 这里请求中的节点是自己 ,且槽标记了importing
// 则消除标记,且将configEpoch增加,以解决configEpoch冲突的问题
if (n == myself &&
server.cluster->importing_slots_from[slot])
{
if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
serverLog(LL_WARNING,
"configEpoch updated after importing slot %d", slot);
}
server.cluster->importing_slots_from[slot] = NULL;
}
// 清除该槽的所有者
clusterDelSlot(slot);
// 将该槽分配给请求中的节点
clusterAddSlot(n,slot);
// ...
}
// ...
}
这里更新完后,返回+OK
,然后变更会传播和同步到所有节点。至此,集群的扩容就完成了。
3. 集群收缩
集群缩容主要步骤是:
- 将下线节点
node
的数据和槽导入到其它节点 - 向其他所有节点发送
CLUSTER FORGET <node>
,让其它节点直到节点node
的下线 - 关闭节点
node
第一步的原理(即数据迁移),在第2节已经说明了;第三步很简单,不需说明。因此这里只说明第二步的原理,即命令CLUSTER FORGET <node>
。
这里依旧是clusterCommand
的分支片段,截取如下,主要是这些:
- 检查请求的节点,不能移除下面几类节点
- 未知节点
- 自己
- 自己的主节点(若自己是从节点)
- 将节点加入黑名单
- 将节点从
clusterState
移除
void clusterCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
// 1. 寻找请求中的节点实例
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
if (!n) {
// 若找不到则返回错误
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
} else if (n == myself) {
// 不能forget自己
addReplyError(c,"I tried hard but I can't forget myself...");
return;
} else if (nodeIsSlave(myself) && myself->slaveof == n) {
// 若自己是从节点,则不能forget自己的主节点
addReplyError(c,"Can't forget my master!");
return;
}
// 2. 将待忘记的节点加入黑名单
clusterBlacklistAddNode(n);
// 3. 从自己的clusterState移除该节点
clusterDelNode(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
} // ...
// ...
}
这里关键的是加入黑名单这一步。加入黑名单的原因防止被遗忘的节点再次加入集群,导致重新上线。
考虑下面的例子:现有节点A, B, C,现要忘掉C
- 先给A发送
CLUSTER FORGET C
移除节点C - 此时B的成员表中还有C,因此节点间通信时,Gossip中依旧会有C的信息
- A收到B的Gossip,里面有C,若不过滤黑名单,那么C依旧会被加入到A的成员表中,导致遗忘失败
上面的例子就是节点要加入黑名单的原因。而黑名单的过滤也在下面代码中有所体现:
void clusterProcessGossipSection {
// ...
if(node) {
// ...
} else {
// ...
// 当Gossip内容部分中的node本节点不知道
// 则需要对其握手,以将其加入集群(本节点视角下)
if (sender &&
!(flags & CLUSTER_NODE_NOADDR) &&
// 这里过滤了黑名单,因此杜绝了上面问题的出现
!clusterBlacklistExists(g->nodename)) {
clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
}
// ...
}
// ...
}
黑名单的有效期只有60s,也就是说,让其他节点忘记节点的有效期只有60s。
#define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
另外CLUSTER FORGET <node>
命令需要发给其它所有节点,因为该命令只会移除本节点视角下的节点node
,这个消息并不会通过消息传播到其它节点,即其它节点(没收到该命令)视角下依旧会有节点node
,即使节点node
实际上不可用了(这种情况下,节点node
会被标记FAIL
,并传播给其它节点,包括遗忘的节点,导致遗忘失败)。