源码阅读-Redis集群: 集群(1)

Posted by keys961 on November 27, 2019

1. Overview

Redis Cluster可以解决单机瓶颈,把负载分摊,并提高并发度。本文和下面几篇会详细分析Redis Cluster的关键源码。

这一篇主要讲Redis Cluster的搭建流程,而集群搭建主要分为下面3步:

  • 节点准备
  • 节点握手
  • 分配槽位

2. 节点准备

启动集群,首先配置文件中cluster-enabled yes要打开。

不过启动节点的时候,和普通Redis节点类似,只是增加了集群配置的初始化:

  • 首先是loadServerConfig函数,载入配置文件的cluster-配置;
  • 然后是在initServer函数中,调用clusterInit初始化集群的状态。

其它部分和普通Redis节点基本一样。因此,主要看clusterInit函数。不过在之前,先说明一下关于集群相关的数据结构:clusterNodeclusterState

2.1. 集群数据结构

这里主要是clusterNodeclusterState

首先是clusterNode,它用来描述集群中的一个节点:

typedef struct clusterNode {
    mstime_t ctime; // 节点对象创建时间
    char name[CLUSTER_NAMELEN]; // 节点名称
    int flags; // 节点标识(CLUSTER_NODE_)
    uint64_t configEpoch; // 该节点观察到的上一次的config epoch
    unsigned char slots[CLUSTER_SLOTS/8]; // 本结点的槽位图
    int numslots; // 本节点的槽个数
    int numslaves; // 本节点下属的从节点个数(若自己是主节点)
    struct clusterNode **slaves; // 从节点数组
    struct clusterNode *slaveof; // 若节点是从节点,该指针指向对应的主节点
    mstime_t ping_sent; // 上一次发送ping的时间
    mstime_t pong_received; // 上一次收到pong的时间
    mstime_t fail_time; // 上一次设置FAIL下线的时间
    mstime_t voted_time; // 上一次为从节点投票的时间
    mstime_t repl_offset_time; // 上一次更新复制偏移量的时间
    mstime_t orphaned_time; // 孤立主节点的迁移时间
    long long repl_offset; // 本节点已知的复制偏移
    char ip[NET_IP_STR_LEN]; // 本节点IP
    int port;  // 本节点端口
    int cport; // 本节点的集群通信端口
    clusterLink *link; // 与本节点相关的连接对象
    list *fail_reports; // 下线报告链表
} clusterNode;

每个节点包含一个连接对象clusterLink,维护与该节点相关的连接状态:

typedef struct clusterLink {
    mstime_t ctime;  // 连接创建时间
    int fd; // 连接文件描述符
    sds sndbuf;  // 输出缓冲
    sds rcvbuf;  // 输入缓冲
    struct clusterNode *node; // 与该连接相关的节点
} clusterLink;

然后是clusterState,代表本节点视角下的集群的状态:

typedef struct clusterState {
    clusterNode *myself; // 本节点实例
    uint64_t currentEpoch; // 当前的epoch
    int state; // 本节点视角的集群状态
    int size;  // 集群的节点个数(节点已分配槽)
    dict *nodes; // name -> clusterNode的字典
    dict *nodes_black_list; // 防止重复添加节点的黑名单
    clusterNode *migrating_slots_to[CLUSTER_SLOTS]; // 导入槽数据到目标节点的数组
    clusterNode *importing_slots_from[CLUSTER_SLOTS]; // 从目标节点槽数据的数组
    clusterNode *slots[CLUSTER_SLOTS]; // 槽和负责该槽节点的映射
    uint64_t slots_keys_count[CLUSTER_SLOTS]; // 每个槽的键个数
    rax *slots_to_keys; // 槽映射到键的radix tree/trie
    
    // The following fields are used to take the slave state on elections.
    mstime_t failover_auth_time; // 之前/下一次选举的时间
    int failover_auth_count; // 节点获得的票数
    int failover_auth_sent; // 若为真,表示本节点向其它节点请求了投票
    int failover_auth_rank; // 该从节点在当前请求中的排名
    uint64_t failover_auth_epoch; // 当前选举的epoch
    int cant_failover_reason; // 不能执行故障转移的原因,见CLUSTER_CANT_FAILOVER
    
    // Manual failover state in common. 
    mstime_t mf_end; // 手动故障转移的时间限制,0代表没有进行手动故障转移
    
    // Manual failover state of master. 
    clusterNode *mf_slave; // 手动故障转移时,对应的从节点 
    
    // Manual failover state of slave.
    long long mf_master_offset; // 主节点的偏移量
    int mf_can_start; // 若非0,说明手动故障转移能开始
    
    // The followign fields are used by masters to take state on elections.
    uint64_t lastVoteEpoch; // 上一次给节点投票的epoch
    int todo_before_sleep; // 调用clusterBeforeSleep()所做的一些事
    
    // Messages received and sent by type.
    long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; //发送的字节数,根据不同种类区分
    long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; // 接收到的字节数,根据不同种类区分
    long long stats_pfail_nodes; // 被标记为PFAIL(潜在下线)的节点数量,不包括没有地址的节点
} clusterState;

2.2. clusterInit初始化

clusterInit初始化的时候:

  • 首先它会初始化clusterState数据结构

  • 若有集群配置文件,优先加载它(用于启动和恢复已有集群);否则创建一个myselfclusterNode实例,代表自己,并保存一个新的集群配置文件

    这个集群配置文件是通过Redis本身创建和更新的,不是用于我们手动修改的

  • 初始化并监听集群通信相关的文件描述符,并注册回调clusterAcceptHandler
  • 其它初始化操作

下面就是具体的代码:

void clusterInit(void) {
    int saveconf = 0;
    server.cluster = zmalloc(sizeof(clusterState));
    // ... 初始化clusterState字段...
    clusterCloseAllSlots(); // 清除所有的槽
    // 首先锁定cluster config文件
    if (clusterLockConfig(server.cluster_configfile) == C_ERR)
        exit(1);

    // 优先加载原先有的cluster config配置
    if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
        // 若没有,则创建一个自己节点的实例,添加到clusterState中
        myself = server.cluster->myself =
            createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
        serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
            myself->name);
        clusterAddNode(myself);
        saveconf = 1;
    }
    // 保存cluster config文件(如果之前不存在)
    if (saveconf) clusterSaveConfigOrDie(1);
   
    server.cfd_count = 0;
    // ...
    // 监听集群通信相关的fd,并注册回调clusterAcceptHandler
    if (listenToPort(server.port+CLUSTER_PORT_INCR,
        server.cfd,&server.cfd_count) == C_ERR) {
        exit(1);
    } else {
        int j;
        for (j = 0; j < server.cfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
                clusterAcceptHandler, NULL) == AE_ERR)
                    // ...
        }
    }
    // 初始化slots -> keys映射的radix tree
    server.cluster->slots_to_keys = raxNew();
    memset(server.cluster->slots_keys_count,0,
           sizeof(server.cluster->slots_keys_count));
    // 设置端口,默认cport = port+CLUSTER_PORT_INCR
    myself->port = server.port;
    myself->cport = server.port+CLUSTER_PORT_INCR;
    if (server.cluster_announce_port)
        myself->port = server.cluster_announce_port;
    if (server.cluster_announce_bus_port)
        myself->cport = server.cluster_announce_bus_port;
    server.cluster->mf_end = 0;
    resetManualFailover(); // 重置手动故障转移
    clusterUpdateMyselfFlags(); // 更新myself的标识,主要是CLUSTER_NODE_NOFAILOVER
}

3. 节点握手

创建好节点后,集群并没有真正建立,需要我们手动建立(比如通过客户端,或者redis-trib.rb)。而核心的命令就是CLUSTER MEET <ip> <port> [<cport>]

这个命令的处理函数为clusterCommand,这里取关键的分支,这里关键的就是和目标节点握手:

void clusterCommand(client *c) {
    // ...
    else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
        long long port, cport;
        // 1. 获取IP, 端口,集群端口
        if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
           // ... error and return ...
        }
        if (c->argc == 5) {
            if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
                // ... error and return ...
            }
        } else {
            cport = port + CLUSTER_PORT_INCR; // 默认cport = port + CLUSTER_PORT_INCR
        }
		// 2. 和目标握手
        if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
            errno == EINVAL) {
            // ... error and return ...
        } else {
            addReply(c,shared.ok);
        }
    }
    // ...
}

可见,握手的关键函数就是clusterStartHandshake,下面就分析这个函数。

3.1. 节点:准备握手

节点握手的准备就是上述的函数clusterStartHandshake,它主要的工作就是:

  • 检查节点地址信息
  • 检查待握手的节点是否已在握手
  • 创建clusterNode节点实例,将其添加到clusterState中,准备握手

节点状态初始为CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET

  • CLUSTER_NODE_HANDSHAKE:未完成第一次握手(第一次ping)
  • CLUSTER_NODE_MEET:需要给节点发送meet消息
int clusterStartHandshake(char *ip, int port, int cport) {
    clusterNode *n;
    // 1. 检查地址
    // ...
	// 2. 检查是否正在握手
    if (clusterHandshakeInProgress(norm_ip,port,cport)) {
        errno = EAGAIN;
        return 0;
    }
    // 3. 添加节点到clusterState中,即server.cluster->nodes字典
    // CLUSTER_NODE_HANDSHAKE: 未完成第一次握手(发送第一次ping)
    // CLUSTER_NODE_MEET: 需要给节点发送meet消息
    n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
    // ...
    clusterAddNode(n);
    return 1;
}

3.2. 节点:给目标节点发送MEET消息

这部分的流程是在clusterCron函数中的,它每秒执行10次。

下面截取发送MEET消息的代码,实际上,这块代码主要用于

  • 重新建立断连的节点连接
  • 发送MEET以和目标节点握手
void clusterCron(void) {
 	// ...
    handshake_timeout = server.cluster_node_timeout;
    if (handshake_timeout < 1000) handshake_timeout = 1000; // 超时最低1s
    clusterUpdateMyselfFlags(); // 更新自己节点myself的标识
    di = dictGetSafeIterator(server.cluster->nodes);
    server.cluster->stats_pfail_nodes = 0;
    // 遍历节点
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        // 不关注自己和没有地址的节点
        if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
		// 统计潜在下线的节点
        if (node->flags & CLUSTER_NODE_PFAIL)
            server.cluster->stats_pfail_nodes++;
        // 若节点正在握手,且超时,则将节点从clusterState删除
        // 即握手超时/失败,不将其加入集群
        if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
            clusterDelNode(node);
            continue;
        }
		// 若没创建连接,则创建连接
        if (node->link == NULL) {
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;
			// 创建socket连接,并构建对应的clusterLink
            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->cport, NET_FIRST_BIND_ADDR);
            if (fd == -1) {
                if (node->ping_sent == 0) node->ping_sent = mstime();
                serverLog(LL_DEBUG, "Unable to connect to "
                    "Cluster Node [%s]:%d -> %s", node->ip,
                    node->cport, server.neterr);
                continue;
            }
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            // 注册读回调clusterReadHandler,用于接收对方的响应
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);
            old_ping_sent = node->ping_sent;
            // 发送握手请求,初始一般是MEET消息
            // 不过没有CLUSTER_NODE_MEET标识,则发送PING请求
            // (例如A MEET B, B收到A并添加A,而B视角中还没和A连接,因此会落到这里)
            // 这个函数是Gossip协议的重要一环,后面会说明
            clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
            if (old_ping_sent) {
                node->ping_sent = old_ping_sent;
            }
            // 到这步,就可以移除CLUSTER_NODE_MEET标识
            // 这步没有移除CLUSTER_NODE_HANDSHAKE,因为它还在握手之中
            node->flags &= ~CLUSTER_NODE_MEET;
            serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
                    node->name, node->ip, node->cport);
        }
    }
    dictReleaseIterator(di);
    // ...
}

3.3. 目标节点:回复MEET消息

从2.2.可知,集群中节点启动,会注册clusterAcceptHandler回调。当有集群通信连接建立,该回调会被调用,然后被注册一个新回调clusterReadHandler,用于接收对方发来的消息:

void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    // ...
    if (server.masterhost == NULL && server.loading) return;

    while(max--) {
        // 接收连接,获得fd
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            // ... error and return ...
        }
        // ...
        // 创建clusterLink对象
        link = createClusterLink(NULL);
        link->fd = cfd;
        // 注册clusterReadHandler回调,用于接受消息
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

这里处理对方请求的函数就是clusterReadHandler,这里大体的步骤是:

  • 一直读取消息,将其追加到link->rcvbuf缓冲
  • 若消息头读取完整,则校验
  • 若消息整体读取完整,处理消息,并清空link->rcvbuf
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    char buf[sizeof(clusterMsg)];
    ssize_t nread;
    clusterMsg *hdr;
    clusterLink *link = (clusterLink*) privdata;
    unsigned int readlen, rcvbuflen;
    UNUSED(el);
    UNUSED(mask);
	// 一直读取,直到没有为止
    while(1) { 
        rcvbuflen = sdslen(link->rcvbuf);
        // 检查消息头是否完整
        if (rcvbuflen < 8) {
            // 这里是不完整的,接下来读取8-rcvbuflen以使其完整
            readlen = 8 - rcvbuflen;
        } else {
            // 这里是完整的
            hdr = (clusterMsg*) link->rcvbuf;
            if (rcvbuflen == 8) {
                // 检查头,若不是RCmb,则报错
                if (memcmp(hdr->sig,"RCmb",4) != 0 ||
                    ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) {
                    serverLog(LL_WARNING,
                        "Bad message length or signature received "
                        "from Cluster bus.");
                    handleLinkIOError(link);
                    return;
                }
            }
            // 获取读取整个完整消息还需要的长度
            readlen = ntohl(hdr->totlen) - rcvbuflen;
            if (readlen > sizeof(buf)) readlen = sizeof(buf);
        }
		// 读取请求,把整个消息全部读完
        nread = read(fd,buf,readlen);
        // 没有数据,返回
        if (nread == -1 && errno == EAGAIN) return;
        if (nread <= 0) {
            // 读取错误,处理错误
            serverLog(LL_DEBUG,"I/O error reading from node link: %s",
                (nread == 0) ? "connection closed" : strerror(errno));
            handleLinkIOError(link);
            return;
        } else {
            // 把数据添加到clusterLink的rcvbuf缓冲中
            link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
            hdr = (clusterMsg*) link->rcvbuf;
            rcvbuflen += nread;
        }

        // 若整个消息以读取完全,则处理消息
        if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
            // 这里clusterProcessPacket就是处理rcvbuf缓存的请求
            if (clusterProcessPacket(link)) {
                // 处理成功,则将缓冲清空
                sdsfree(link->rcvbuf);
                link->rcvbuf = sdsempty();
            } else {
                return; /* Link no longer valid. */
            }
        }
    }
}

而最终处理接收缓冲消息的函数,就是clusterProcessPacket了。这部分也是Gossip协议的重要部分,因此也非常长。这里截取相关的代码,这里主要是:

  • 必要时根据MEET更新自己节点的地址信息
  • 若发送方在自己视角下未知,且为MEET,则说明发送方是新加入集群的节点,则:
    • 将发送方添加到自己的clusterState中,状态为CLUSTER_NODE_HANDSHAKE,之后的clusterCron会创建与发送方的连接并与其握手
    • 将消息Gossip到集群
  • 响应发送方PONG
int clusterProcessPacket(clusterLink *link) {
	// ...
    // 获取发送方是否已知
    sender = clusterLookupNode(hdr->sender);
    // ...
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
        serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);

        // 若消息是MEET,且自己的地址没有设置,则根据MEET消息更新自己的地址
        if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
            server.cluster_announce_ip == NULL) {
            char ip[NET_IP_STR_LEN];
            if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
                strcmp(ip,myself->ip)) {
                memcpy(myself->ip,ip,NET_IP_STR_LEN);
                serverLog(LL_WARNING,"IP address for this node updated to %s",
                    myself->ip);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            }
        }
        // 若发送方未知,且类型是MEET
        // 则在自己的视角下,创建发送方节点实例,并添加到自己的clusterState中
        // 节点状态是CLUSTER_NODE_HANDSHAKE,因为它没有完成自己和发送方的握手
        // 而发送方节点的flags、slaveof等等都没有设置,因为可从其他节点接收到PONG时获取到这些信息
        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
            clusterNode *node;
            node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
            nodeIp2String(node->ip,link,hdr->myip);
            node->port = ntohs(hdr->port);
            node->cport = ntohs(hdr->cport);
            clusterAddNode(node);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
        if (!sender && type == CLUSTERMSG_TYPE_MEET)
            // 把MEET消息Gossip到其它节点上
            clusterProcessGossipSection(hdr,link);
        // 响应对方PONG,以完成发送方和自己的握手(反过来说是不对的)
        clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
    }
	// ...
}

3.4. 节点:处理目标节点的PONG

节点收到目标节点的PONG,也是根据clusterReadHandler这个回调处理,最终也会落入clusterProcessPacket函数(参考3.2.节),注意本节点已经知道了目标节点。

这里也截取关键的代码,它主要工作是:

  • 更新本节点视角下,目标节点的信息(如名字等)
  • 清除CLUSTER_NODE_HANDSHAKE标识
  • 清除故障转移的标识
  • 清除潜在下线的标识,必要时清除下线标识(可能下线后重新上线)
int clusterProcessPacket(clusterLink *link) {
    // ...
    // 获取发送方是否存在
    // 这里会找不到
    // 因为目标节点的响应PONG携带的节点名是目标节点随机生成的,因此名字对不上,从而就找不到了
    sender = clusterLookupNode(hdr->sender); // Is null when
    // ...
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET) {
        // ...
        // 若与该连接的节点存在(这里会进入)
        if (link->node) {
            // 若节点正在握手
            if (nodeInHandshake(link->node)) {
                // 若发送方已知,则更新发送方信息,并清除该连接相关的节点(这里不会进入)
                if (sender) {
                    serverLog(LL_VERBOSE,
                        "Handshake: we already know node %.40s, "
                        "updating the address if needed.", sender->name);
                    if (nodeUpdateAddressIfNeeded(sender,link,hdr)) {
                        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                             CLUSTER_TODO_UPDATE_STATE);
                    }
                    clusterDelNode(link->node);
                    return 0;
                }
                // 这里更新目标节点在本节点视角下的名字
                clusterRenameNode(link->node, hdr->sender);
                serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
                    link->node->name);
                // 清除CLUSTER_NODE_HANDSHAKE,表示本节点视角下,与目标节点握手完成
                link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
                link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            } else if (memcmp(link->node->name,hdr->sender,
                        CLUSTER_NAMELEN) != 0) {
                // 若握手已完成,但是名字对不上
                // 则本视角下,置该节点为CLUSTER_NODE_NOADDR,并清除地址和关联的连接对象
                // ...
                link->node->flags |= CLUSTER_NODE_NOADDR;
                link->node->ip[0] = '\0';
                link->node->port = 0;
                link->node->cport = 0;
                freeClusterLink(link);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
                return 0;
            }
        }

        // Set nofailover (clear failover) ...
        // Update sender's info if PING and not handshaking ...
        // 这里是PONG的处理
        if (link->node && type == CLUSTERMSG_TYPE_PONG) {
            link->node->pong_received = mstime(); // 更新pong时间戳,用其判断是否下线
            link->node->ping_sent = 0; // 清除ping时间戳
            // 清除PFAIL潜在下线标识
            if (nodeTimedOut(link->node)) {
                link->node->flags &= ~CLUSTER_NODE_PFAIL;
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                     CLUSTER_TODO_UPDATE_STATE);
            } else if (nodeFailed(link->node)) {
                // 必要时清除下线标识
                clearNodeFailureIfNeeded(link->node);
            }
        }
    // ...
}

这步过后,本节点和目标节点的握手就完成了。

而目标节点和本节点的握手并没有完成,因为在目标节点视角下,本节点设置了CLUSTER_NODE_HANDSHAKE标识,但是没有设置CLUSTER_NODE_MEET。因此目标节点会在下一轮事件循环中,向本节点发送PING,本节点响应PONG并被目标节点处理,此时目标节点才与本节点握手完成。

握手完成后,节点就会定期向其它节点发送PING,以探测节点是否可用,代码在clusterCron中:

void clusterCron(void) {
    // ...
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
    	// ...
        // 跳过自己、没地址和正在握手的节点
        if (node->flags &
            (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
                continue;
        // ...
        if (node->link &&
            node->ping_sent == 0 &&
            (now - node->pong_received) > server.cluster_node_timeout/2) {
            // 定期发送ping
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }
        // ...
    }
    // ...
}

3.5. Gossip协议

从上面可知,我们没有两两执行CLUSTER MEET指令,但是新节点被集群内所有节点感知,这就是因为使用了Gossip协议。

Gossip协议的消息可以是PING,MEETPONG,消息的格式如下所示:

typedef struct {
    char nodename[CLUSTER_NAMELEN]; // 节点名字(可以不是本节点)
    uint32_t ping_sent; // 最近一次发送PING的时间戳
    uint32_t pong_received; // 最后一次接受PONG的时间
    char ip[NET_IP_STR_LEN]; // 节点IP
    uint16_t port;  // 节点端口
    uint16_t cport; // 节点集群端口
    uint16_t flags; // 节点标识
    uint32_t notused1; // 未使用
} clusterMsgDataGossip;

而发送方可以发送很多的clusterMsgDataGossip,即数组:

union clusterMsgData {
    /* PING, MEET and PONG */
    struct {
        // Gossip消息数组
        clusterMsgDataGossip gossip[1];
    } ping;

    /* FAIL */
    struct {
        clusterMsgDataFail about;
    } fail;

    /* PUBLISH */
    struct {
        clusterMsgDataPublish msg;
    } publish;

    /* UPDATE */
    struct {
        clusterMsgDataUpdate nodecfg;
    } update;

    /* MODULE */
    struct {
        clusterMsgModule msg;
    } module;
};

而Redis Cluster消息是有消息头和消息体组成,体部是上面的定义,下面定义的是整个消息的格式:

typedef struct {
    char sig[4];        /* Signature "RCmb" (Redis Cluster message bus). */
    uint32_t totlen;    /* Total length of this message */
    uint16_t ver;       /* Protocol version, currently set to 1. */
    uint16_t port;      /* TCP base port number. */
    uint16_t type;      /* Message type */
    uint16_t count;     /* Only used for some kind of messages. */
    uint64_t currentEpoch;  /* The epoch accordingly to the sending node. */
    uint64_t configEpoch;   /* The config epoch if it's a master, or the last
                               epoch advertised by its master if it is a
                               slave. */
    uint64_t offset;    /* Master replication offset if node is a master or
                           processed replication offset if node is a slave. */
    char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
    unsigned char myslots[CLUSTER_SLOTS/8];
    char slaveof[CLUSTER_NAMELEN];
    char myip[NET_IP_STR_LEN];    /* Sender IP, if not all zeroed. */
    char notused1[34];  /* 34 bytes reserved for future usage. */
    uint16_t cport;      /* Sender TCP cluster bus port */
    uint16_t flags;      /* Sender node flags */
    unsigned char state; /* Cluster state from the POV of the sender */
    unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
    union clusterMsgData data; // 这里就是定义的体部,如gossip消息
} clusterMsg;

当节点发送PING,PONGMEET时,会使用Gossip协议,这里使用的是clusterSendPing函数,发送方会:

  • 选取几个自己已知的节点,将这些节点的信息追加到gossip数组
  • 把消息发送给目标接收方
  • 发送方会将1~2步应用到其它所有已知节点上

而接收方:

  • 处理Gossip消息,不仅感知发送方,而且感知了gossip数组中其它节点的信息

而发送怎么选取节点,可以参考clusterSendPing函数,这里代码略,不过有2个重要变量需要说明:

  • freshnodes:除了自己和目标节点之外,集群中其它所有节点个数,即
    • freshnodes = dictSize(server.cluster->nodes)-2
  • wanted:Gossip消息中,需要选取节点的个数,满足
    • wanted = floor(dictSize(server.cluster->nodes)/10)
    • wanted最小为3

4. 槽指派

组成集群后,实际上还是不能工作的,因为没有分配槽位(slot)。

而槽的分配可通过CLUSTER ADDSLOTS命令指定。

4.1. 槽信息维护

如之前所述,每个节点的槽分配信息记录在clusterNode里,如下所示:

  • slots:实际上是一个bitmap,若某个槽位是自己管理的,对应的位会被置1,否则置0(因此它是一个长度为CLUSTER_SLOTS/8char数组)
  • numslots:该节点分配到的槽个数
#define CLUSTER_SLOTS 16384
typedef struct clusterNode {
    // ...
    unsigned char slots[CLUSTER_SLOTS/8]; // 本结点的槽位图
    int numslots; // 本节点的槽个数
    // ...
}

此外,集群状态clusterState也会有对应的槽位管理信息:

  • migrating_slots_to:记录导出槽数据到的目标节点,即:
    • 本节点需要把第i槽的数据导出到migration_slots_to[i]
  • importing_slots_from:记录导入数据的源节点,即:
    • 本节点需要把第i槽的数据从importing_slots_from[i]导入进来
  • slots:槽和节点的映射,即:
    • slots[i]的节点负责第i
  • slots_keys_count:每个槽的键个数
typedef struct clusterState {
	// ...
    clusterNode *migrating_slots_to[CLUSTER_SLOTS];
    clusterNode *importing_slots_from[CLUSTER_SLOTS];
    clusterNode *slots[CLUSTER_SLOTS];
    uint64_t slots_keys_count[CLUSTER_SLOTS];
    // ...
}

4.2. 处理槽位分配

客户端发起CLUSTER ADDSLOTS <slot> [slot ...],即可执行槽位分配。

这里入口函数依旧是clusterCommand,这里截取关键分支,:

void clientCommand(client *c) {
    // ...
    else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
               !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3){
        int j, slot;
        // 创建本节点一个新slots分配图,存储请求参数的信息
        unsigned char *slots = zmalloc(CLUSTER_SLOTS);
        int del = !strcasecmp(c->argv[1]->ptr,"delslots");
        memset(slots,0,CLUSTER_SLOTS);
        // 遍历所有参数
        for (j = 2; j < c->argc; j++) {
            // 获取参数中的槽位置
            if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
                zfree(slots);
                return;
            }
            if (del && server.cluster->slots[slot] == NULL) {
                // 如果是删除操作,但是槽没有指定负责的节点,回复错误信息
                addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
                zfree(slots);
                return;
            } else if (!del && server.cluster->slots[slot]) {
                // 如果是添加操作,但是槽已有指定负责的节点,也回复错误信息
                addReplyErrorFormat(c,"Slot %d is already busy", slot);
                zfree(slots);
                return;
            }
            // 给对应slot的分配图+1,表示已分配
            if (slots[slot]++ == 1) {
                // 若重新分配多次,也返回错误
                addReplyErrorFormat(c,"Slot %d specified multiple times",
                    (int)slot);
                zfree(slots);
                return;
            }
        }
        // 遍历所有的槽(根据新创建的槽位图,即参数信息)
        // 根据新槽位图(参数信息),更新槽位图(添加/删除)
        for (j = 0; j < CLUSTER_SLOTS; j++) {
            if (slots[j]) {
                int retval;

                // 如果这个槽被设置为导入状态,那么取消该状态
                if (server.cluster->importing_slots_from[j])
                    server.cluster->importing_slots_from[j] = NULL;
				// 添加/删除槽位分配
                retval = del ? clusterDelSlot(j) :
                               clusterAddSlot(myself,j);
                serverAssertWithInfo(c,NULL,retval == C_OK);
            }
        }
        zfree(slots);
        clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
        addReply(c,shared.ok);
    } 
    // ...
}

这里的代码还是很简单的:

  • 遍历参数,并记录
    • 若是删除操作,但是槽位没有指定负责的节点,回复错误信息
    • 若是添加操作,但是槽位已经指定负责的节点,回复错误信息
    • 若某个槽位值已经指定过多次了(在参数中指定了多次),回复错误信息
  • 根据这个记录,添加/删除槽位分配

而本身添加和删除槽位也是很简单的:

  • 添加槽位:clusterAddSlot,这里就是把自己节点的对应槽位1(clusterNode->slots),并设置负责该槽的节点(clusterState->slots),并更新其它信息
  • 删除节点:clusterDelSlot,这里就是把负责该槽的节点的槽位0,并置空负责该槽的节点,并更新其它信息

4.3. 广播槽位信息

CLUSTER ADDSLOT命令仅可以更改节点本视角下集群的槽位分配,因此需要将这个信息传播,以保证集群一致性。

4.3.1. 槽位信息发送

当每次发送消息时(不论什么类型的消息,因此不仅仅是Gossip了),发送的消息都遵守一定的格式,如3.5.所述。消息包含头部和体部,体部根据不同类型有所变化,但是头部格式是基本不变的。

而槽位的信息就包含在头部。每当节点向其他成员发消息时,都会赋上本节点负责的槽位图,代码如下所示:

void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
    // ...
    // 附带本节点负责的槽位图到消息头上
    memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots)); 
    // ...
}

4.3.2. 槽位信息接收和处理

和之前的一样,消息接收处理的最后进入clusterProcessPacket函数,这里截取更新槽位信息的片段:

int clusterProcessPacket(clusterLink *link) {
    sender = clusterLookupNode(hdr->sender); // 查找是否是已知节点
    // ...
    if(type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET) {
        // ...
    	clusterNode *sender_master = NULL; /* Sender or its master if slave. */
    	int dirty_slots = 0; // 槽位图是否不一致
		// 1. 判断本节点视角下,发送方节点槽位图和请求中的槽位图是否不一致
        if (sender) {
            sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
            if (sender_master) {
                dirty_slots = memcmp(sender_master->slots,
                        hdr->myslots,sizeof(hdr->myslots)) != 0;
            }
        }
        
        // 2. 若发送方是主节点,且出现了不一致,则需要更新本节点视角下发送方的槽位图
        if (sender && nodeIsMaster(sender) && dirty_slots)
            clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
        
        // ... handling greater epoch ...
        // ...
    }
	// ...
}

这里关键函数就是clusterUpdateSlotsConfigWith

void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
    // sender一定是主节点,slot是sender发来的分配到sender的槽位图
    int j;
    clusterNode *curmaster, *newmaster = NULL;
    
    uint16_t dirty_slots[CLUSTER_SLOTS];
    int dirty_slots_count = 0;
    
    // 如果当前节点是主节点,那么获取当前节点
    // 如果当前节点是从节点,那么获取当前从节点所从属的主节点
    curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;

    if (sender == myself) {
        serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
        return;
    }
	// 遍历所有槽
    for (j = 0; j < CLUSTER_SLOTS; j++) {
        // 若请求中第j槽被分配
        if (bitmapTestBit(slots,j)) {
            // 若本节点视角下,第j槽本身就被sender负责,跳过
            if (server.cluster->slots[j] == sender) continue;
            // 若本节点视角下,第j槽需要从其它节点导入,也跳过
            // 因为这个字段只能被redis-trib修改(如当重新分片时)
            if (server.cluster->importing_slots_from[j]) continue;
            // 当满足下面条件时,重新分配第j槽给sender(clusterState中):
            // 1. 第j槽本身没有分配(clusterState中),或者请求的configEpoch比本节点的更大
            // 2. 该槽的数据并没有执行导入
            if (server.cluster->slots[j] == NULL ||
                server.cluster->slots[j]->configEpoch < senderConfigEpoch) {
                // 记录脏槽
                // 脏槽指: 原本属于自己的,且有数据的槽,该槽需要被分配给其它节点(即sender)
                if (server.cluster->slots[j] == myself &&
                    countKeysInSlot(j) &&
                    sender != myself) {
                    dirty_slots[dirty_slots_count] = j;
                    dirty_slots_count++;
                }
				// 若执行这一步,说明第j槽原来是自己的主节点,现在由sender接管
                // 即发生了故障转移
                if (server.cluster->slots[j] == curmaster)
                    newmaster = sender;
                // 删除当前被指定的槽
                clusterDelSlot(j);
                // 将该槽分配给sender
                clusterAddSlot(sender,j);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                     CLUSTER_TODO_UPDATE_STATE|
                                     CLUSTER_TODO_FSYNC_CONFIG);
            }
        }
    }
    
    if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
        return;

	// 若发生了故障转移,且老的主节点已经不负责任何槽
    // 则在本节点视角下,将sender提升为自己的主节点
    if (newmaster && curmaster->numslots == 0) {
        serverLog(LL_WARNING,
            "Configuration change detected. Reconfiguring myself "
            "as a replica of %.40s", sender->name);
        clusterSetMaster(sender);
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_UPDATE_STATE|
                             CLUSTER_TODO_FSYNC_CONFIG);
    } else if (dirty_slots_count) {
        // 若检测到脏槽,那么需要把脏槽的数据清除
        for (j = 0; j < dirty_slots_count; j++)
            delKeysInSlot(dirty_slots[j]);
    }
}

总结一下上面的函数,本节点发送更新槽位信息分为2种情况:

  • 若本节点视角下,槽位没有被指定:直接分配给发送者
  • 若本节点视角下,槽位已被指定,且configEpoch更大
    • 记录脏槽位(原本属于自己,但要分配给发送者)
    • 若槽位原本属于自己/自己的主节点,那么还得记录故障转移信息
    • 分配槽位给发送者

而分配完成后,还有一些事情要做:

  • 若检测到故障转移的信息,即槽位从自己/自己的主节点变成了发送方,则设置本节点为发送方的从节点
  • 若检测到脏槽位,则需要清除脏槽位的键数据

如果时间足够,每个主节点都会将自己负责的槽位信息告知给集群中的其他节点,于是,集群中的每一个节点都会知道16384个槽分别指派给了集群中的哪个节点,达成最终一致。

达到最终一致后,集群就正式可用了。

5. 槽路由

计算键属于哪个槽,可通过CLUSTER KEYSLOT <key>获得,内部起始调用的就是CRC16算法,代码如下:

unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */
    for (s = 0; s < keylen; s++)
        if (key[s] == '{') break;
    /* No '{' ? Hash the whole key. This is the base case. */
    if (s == keylen) return crc16(key,keylen) & 0x3FFF;
    /* '{' found? Check if we have the corresponding '}'. */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;
    /* No '}' or nothing between {} ? Hash the whole key. */
    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
    /* If we are here there is both a { and a } on its right. Hash
     * what is in the middle between { and }. */
    return crc16(key+s+1,e-s-1) & 0x3FFF;
}

而当请求的键所在的槽不属于本节点时,会返回客户端MOVE错误,并捎带正确客户端的信息,客户端就可以重定向请求到正确节点。

6. 集群模式下的数据库

在单机模式下,Redis可拥有多个数据库。但是在集群模式下,只能使用0号数据库(即第一个)。

除此之外,Redis中clusterStateslots_to_keys字段,维护了一个槽和键的关系。

typedef struct clusterState {
	// ...
    rax *slots_to_keys;
    // ...
} clusterState;

这个字段是一个radix tree,你可以将其看作一个压缩的trie,具体可以看这篇文章:https://my.oschina.net/yunqi/blog/3039132。

slots_to_keys存储了本节点上存储的所有的键,并且附带了键所在的槽号。

Redis是这样存的:

  • 获取记录的键key,计算它所在的槽号id
  • 然后将id + key作为整个字符串,存储到slots_to_keys

当客户端发起如CLUSTER GETKEYSINSLOT时,由于槽号是存储在树的开头,因此能很容易的找到对应槽下所有键的子树($O(logN)$),然后再执行遍历。而之前版本使用skip list存储,虽然定位对应槽下所有键的时间复杂度也一样(也是$O(logN)$),但是相比radix tree更耗内存(因为radix tree重用字符且会压缩子节点)。