源码阅读-Redis独立功能: 发布与订阅

Posted by keys961 on December 10, 2019

1. Overview

Redis支持消息的发布和订阅功能,这些主要由下面2个命令实现:

  • PUBLISH:向某个频道发布消息
  • SUBSCRIBE:订阅某个或多个频道的消息

还有其它命令,如PSUBSCRIBE(根据正则表达式订阅频道),UNSUBSCRIBE/PUNSUBSCRIBE(取消订阅/根据正则表达式取消订阅),基本基于上面的命令实现。

本文会说明下面情况下的发布/订阅功能:

  • 单个主节点(节点可能包含从节点)
  • 多个主节点的集群

2. 消息订阅

消息订阅使用SUBSCRIBE命令,格式是:SUBSCRIBE channel_1 channel2 ...

在看如何实现订阅前,首先看看相关字段的定义。在客户端client中,字段pubsub_channels记录了该客户端订阅了哪些频道,该字段是一个字典(实际上是一个集合);而在服务器redisServer中,字段pubsub_channels记录了该节点创建了哪些频道,并记录了该频道下订阅的客户端。

typedef struct client {
    // ...
    dict *pubsub_channels; // 记录该客户端订阅了哪些频道(用于SUBSCRIBE)
    list *pubsub_patterns; // 记录了客户端订阅了哪些频道,存的是正则(用于PSUBSCRIBE) 
    // ...
} client;

typedef struct redisServer {
    // ...
    dict *pubsub_channels; // 记录该服务端创建了哪些频道,以及对应订阅的客户端列表(用于SUBSCRIBE)
    list *pubsub_patterns; // 记录该服务端创建了哪些频道(正则表示),以及对应订阅的客户端(用于PSUBSCRIBE)
    // ...
}
// pubsub_patterns存下面这个结构
typedef struct pubsubPattern {
    client *client;
    robj *pattern;
} pubsubPattern;

处理该命令的函数是subscribeCommand。根据上面的定义,该实现就非常简单了,只需:

  • 客户端pubsub_channels中添加订阅的频道
  • 服务端pubsub_channels添加订阅信息:
    • 若频道是新的,则创建新的
    • 对应频道的订阅列表中加入对应的客户端
  • 客户端标记CLIENT_PUBSUB,以进入发布/订阅上下文

该操作不需考虑集群和主从复制的情况,订阅操作在这些情况下的动作都是一样的。

void subscribeCommand(client *c) {
    int j;
	// 订阅参数中指定的频道
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    // 该客户端进入发布/订阅上下文
    c->flags |= CLIENT_PUBSUB;
}

int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;
	// 把订阅的频道添加到客户端
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        // 把客户端添加到服务端的订阅表中
        listAddNodeTail(clients,c);
    }
    // 回复响应
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

PSUBSCRIBESUBSCRIBE类似,只是更新client->pubsub_patternsserver->pubsub_patterns字段而已。

执行SUBSCRIBE/PSUBSCRIBE后,客户端会进入发布/订阅上下文,此时,服务端只会执行客户端的下列命令:

  • PING
  • SUBSCRIBE/UNSUBSCRIBE
  • PSUBSCRIBE/PUNSUBSCRIBE
  • QUIT

依据如下:

int processCommand(client *c) {
    // ...
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }
    // ...
}

3. 消息发布

消息发布需要使用PUBLISH命令,格式是:PUBLISH channel msg。处理该命令的函数是publishCommand

void publishCommand(client *c) {
    // 1. 发布消息
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled)
        // 2.a. 集群下,把发布的消息传播给其它节点
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        // 2.b. 标记强制复制/传播命令给从节点,当命令执行完后,将其发给从节点
        forceCommandPropagation(c,PROPAGATE_REPL);
    addReplyLongLong(c,receivers);
}

这里主要做:

  • 将消息发布到频道中
  • 传播发布消息命令
    • 若在集群模式:将发布消息的命令传播给集群其它节点
    • 若在单机模式:标记命令”强制复制“(PROPAGATE_REPL),之后将该命令传播给从节点

3.1. 发布消息到频道

从上面可知,把消息发布到频道的函数是pubsubPublishMessage。根据第2节的定义,其实现也是很简单的:

  • 查找server->pubsub_channels字典,找到创建的频道和其订阅的客户端列表,把消息发到它们的输出缓冲区
  • 遍历server->pubsub_patterns列表,若频道匹配,将消息发到对应订阅的客户端输出缓冲区

回顾:后面的事件循环,输出缓冲区被刷到网络,信息就会被发送到订阅的客户端。

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    // 查找server->pubsub_channels字典
    // 找到创建的频道和其订阅的客户端列表,把消息发到它们的输出缓冲区
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    // 遍历server->pubsub_patterns列表
    // 若频道匹配,将消息发到对应订阅的客户端输出缓冲区
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;

            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    // 返回接收者/订阅者个数
    return receivers;
}

3.2. 单主节点:PUBLISH的复制

PUBLISH命令不会更改数据库的数据,但是不在集群模式下,当节点有从节点时,该命令还是会被传播给从节点

原因很简单,考虑下面场景:

  • A是主节点,B是A的从节点
  • 客户端C1向A订阅了频道P,客户端C2向B也订阅了频道P
  • 客户端C1向A,往频道P发布了消息M,按道理,客户端C2也得收到消息M
  • 因此A需要把发布的消息M同步/传播到B,这样B就同样把消息发给客户端C2,客户端C2就能收到客户端C1发布的消息了

在这里,调用函数forceCommandPropagation,给客户端标记为CLIENT_FORCE_REPL。这样当PUBLISH执行完后,服务端会强制将该命令写入从节点的输出缓冲区,即传播给从节点:

void forceCommandPropagation(client *c, int flags) {
    if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL; // 主要是这一行的标记
    if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
}

强制传播的代码在call函数中,这里截取片段:

void call(client *c, int flags) {
    // ...
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) {
        int propagate_flags = PROPAGATE_NONE;
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
		// 这里客户端被标记CLIENT_FORCE_REPL,因此propagate_flags带有PROPAGATE_REPL
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
		// ...
		// propagate_flags带有PROPAGATE_REPL
        // 所以会调用propagate函数,将命令传播给从节点
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
    // ...
}

3.3. 集群模式:PUBLISH消息广播

在集群模式下,PUBLISH消息还会广播到整个集群,从而让其它客户端(连接到其它节点的)接收到发布的消息。

如第3节开头所述,这里调用函数clusterPropagatePublish,实际上调用的是函数clusterSendPublish。它把发布的消息包装成clusterMsg,广播给集群所有节点。其代码如下:

void clusterPropagatePublish(robj *channel, robj *message) {
    clusterSendPublish(NULL, channel, message);
}

void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
    unsigned char buf[sizeof(clusterMsg)], *payload;
    clusterMsg *hdr = (clusterMsg*) buf;
    uint32_t totlen;
    uint32_t channel_len, message_len;

    channel = getDecodedObject(channel);
    message = getDecodedObject(message);
    channel_len = sdslen(channel->ptr);
    message_len = sdslen(message->ptr);
	// 组装消息,类型是CLUSTERMSG_TYPE_PUBLISH
    // 该消息会携带clusterMsgDataPublish结构
    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
	// 设置频道和消息的长度
    hdr->data.publish.msg.channel_len = htonl(channel_len);
    hdr->data.publish.msg.message_len = htonl(message_len);
    hdr->totlen = htonl(totlen);
    if (totlen < sizeof(buf)) {
        payload = buf;
    } else {
        payload = zmalloc(totlen);
        memcpy(payload,hdr,sizeof(*hdr));
        hdr = (clusterMsg*) payload;
    }
    // 设置频道和消息值
    memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
    memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
        message->ptr,sdslen(message->ptr));
	// 第一个参数是NULL,所以是广播,把PUBLISH的消息广播给其它节点
    if (link)
        clusterSendMessage(link,payload,totlen);
    else
        clusterBroadcastMessage(payload,totlen);
    decrRefCount(channel);
    decrRefCount(message);
    if (payload != buf) zfree(payload);
}

广播的消息格式如下:

typedef struct {
    uint32_t channel_len; // 频道的值长度
    uint32_t message_len; // 消息的长度
    unsigned char bulk_data[8]; // 频道和消息值,两者拼接而成
} clusterMsgDataPublish;

其它节点接收到这个消息,会在clusterProcessPacket函数处理这个消息,并把消息发送给订阅了该频道的客户端。下面截取片段:

int clusterProcessPacket(clusterLink *link) {
 	// ...
    else if (type == CLUSTERMSG_TYPE_PUBLISH) {
        robj *channel, *message;
        uint32_t channel_len, message_len;
        
        if (dictSize(server.pubsub_channels) ||
           listLength(server.pubsub_patterns))
        {
            // 从消息中获取频道和消息
            channel_len = ntohl(hdr->data.publish.msg.channel_len);
            message_len = ntohl(hdr->data.publish.msg.message_len);
            channel = createStringObject(
                        (char*)hdr->data.publish.msg.bulk_data,channel_len);
            message = createStringObject(
                        (char*)hdr->data.publish.msg.bulk_data+channel_len,
                        message_len);
            // 将消息发送给订阅该频道的客户端(连接到本节点的)
            // 这里见3.1.,一样的函数
            pubsubPublishMessage(channel,message);
            decrRefCount(channel);
            decrRefCount(message);
        }
    } // ...
    // ...
}

4. 总结

总体而言,发布/订阅的实现是非常简单的:

  • 订阅时,维护好”频道-客户端“的订阅表

  • 发布时

    • 查找订阅表,将消息发送给订阅表中记录的客户端

    • 对于单主节:

      • 发布的命令会传播到从节点

      • 从节点会把消息发送给订阅该频道的客户端,这些客户端连接的是该从节点

    • 对于集群:

  • 发布的消息广播到集群所有节点

    • 其它节点会把消息发送给订阅该频道的客户端,这些客户端连接的是该节点