1. Overview
Redis支持消息的发布和订阅功能,这些主要由下面2个命令实现:
PUBLISH
:向某个频道发布消息SUBSCRIBE
:订阅某个或多个频道的消息
还有其它命令,如
PSUBSCRIBE
(根据正则表达式订阅频道),UNSUBSCRIBE/PUNSUBSCRIBE
(取消订阅/根据正则表达式取消订阅),基本基于上面的命令实现。
本文会说明下面情况下的发布/订阅功能:
- 单个主节点(节点可能包含从节点)
- 多个主节点的集群
2. 消息订阅
消息订阅使用SUBSCRIBE
命令,格式是:SUBSCRIBE channel_1 channel2 ...
。
在看如何实现订阅前,首先看看相关字段的定义。在客户端client
中,字段pubsub_channels
记录了该客户端订阅了哪些频道,该字段是一个字典(实际上是一个集合);而在服务器redisServer
中,字段pubsub_channels
记录了该节点创建了哪些频道,并记录了该频道下订阅的客户端。
typedef struct client {
// ...
dict *pubsub_channels; // 记录该客户端订阅了哪些频道(用于SUBSCRIBE)
list *pubsub_patterns; // 记录了客户端订阅了哪些频道,存的是正则(用于PSUBSCRIBE)
// ...
} client;
typedef struct redisServer {
// ...
dict *pubsub_channels; // 记录该服务端创建了哪些频道,以及对应订阅的客户端列表(用于SUBSCRIBE)
list *pubsub_patterns; // 记录该服务端创建了哪些频道(正则表示),以及对应订阅的客户端(用于PSUBSCRIBE)
// ...
}
// pubsub_patterns存下面这个结构
typedef struct pubsubPattern {
client *client;
robj *pattern;
} pubsubPattern;
处理该命令的函数是subscribeCommand
。根据上面的定义,该实现就非常简单了,只需:
- 客户端
pubsub_channels
中添加订阅的频道 - 服务端
pubsub_channels
添加订阅信息:- 若频道是新的,则创建新的
- 对应频道的订阅列表中加入对应的客户端
- 客户端标记
CLIENT_PUBSUB
,以进入发布/订阅上下文
该操作不需考虑集群和主从复制的情况,订阅操作在这些情况下的动作都是一样的。
void subscribeCommand(client *c) {
int j;
// 订阅参数中指定的频道
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
// 该客户端进入发布/订阅上下文
c->flags |= CLIENT_PUBSUB;
}
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
// 把订阅的频道添加到客户端
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// 把客户端添加到服务端的订阅表中
listAddNodeTail(clients,c);
}
// 回复响应
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
而PSUBSCRIBE
和SUBSCRIBE
类似,只是更新client->pubsub_patterns
和server->pubsub_patterns
字段而已。
执行
SUBSCRIBE
/PSUBSCRIBE
后,客户端会进入发布/订阅上下文,此时,服务端只会执行客户端的下列命令:
PING
SUBSCRIBE
/UNSUBSCRIBE
PSUBSCRIBE
/PUNSUBSCRIBE
QUIT
依据如下:
int processCommand(client *c) { // ... if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; } // ... }
3. 消息发布
消息发布需要使用PUBLISH
命令,格式是:PUBLISH channel msg
。处理该命令的函数是publishCommand
:
void publishCommand(client *c) {
// 1. 发布消息
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled)
// 2.a. 集群下,把发布的消息传播给其它节点
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
// 2.b. 标记强制复制/传播命令给从节点,当命令执行完后,将其发给从节点
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
这里主要做:
- 将消息发布到频道中
- 传播发布消息命令
- 若在集群模式:将发布消息的命令传播给集群其它节点
- 若在单机模式:标记命令”强制复制“(
PROPAGATE_REPL
),之后将该命令传播给从节点
3.1. 发布消息到频道
从上面可知,把消息发布到频道的函数是pubsubPublishMessage
。根据第2节的定义,其实现也是很简单的:
- 查找
server->pubsub_channels
字典,找到创建的频道和其订阅的客户端列表,把消息发到它们的输出缓冲区 - 遍历
server->pubsub_patterns
列表,若频道匹配,将消息发到对应订阅的客户端输出缓冲区
回顾:后面的事件循环,输出缓冲区被刷到网络,信息就会被发送到订阅的客户端。
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
// 查找server->pubsub_channels字典
// 找到创建的频道和其订阅的客户端列表,把消息发到它们的输出缓冲区
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
// 遍历server->pubsub_patterns列表
// 若频道匹配,将消息发到对应订阅的客户端输出缓冲区
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
// 返回接收者/订阅者个数
return receivers;
}
3.2. 单主节点:PUBLISH
的复制
PUBLISH
命令不会更改数据库的数据,但是不在集群模式下,当节点有从节点时,该命令还是会被传播给从节点。
原因很简单,考虑下面场景:
- A是主节点,B是A的从节点
- 客户端C1向A订阅了频道P,客户端C2向B也订阅了频道P
- 客户端C1向A,往频道P发布了消息M,按道理,客户端C2也得收到消息M
- 因此A需要把发布的消息M同步/传播到B,这样B就同样把消息发给客户端C2,客户端C2就能收到客户端C1发布的消息了
在这里,调用函数forceCommandPropagation
,给客户端标记为CLIENT_FORCE_REPL
。这样当PUBLISH
执行完后,服务端会强制将该命令写入从节点的输出缓冲区,即传播给从节点:
void forceCommandPropagation(client *c, int flags) {
if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL; // 主要是这一行的标记
if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
}
强制传播的代码在call
函数中,这里截取片段:
void call(client *c, int flags) {
// ...
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) {
int propagate_flags = PROPAGATE_NONE;
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
// 这里客户端被标记CLIENT_FORCE_REPL,因此propagate_flags带有PROPAGATE_REPL
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
// ...
// propagate_flags带有PROPAGATE_REPL
// 所以会调用propagate函数,将命令传播给从节点
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
// ...
}
3.3. 集群模式:PUBLISH
消息广播
在集群模式下,PUBLISH
消息还会广播到整个集群,从而让其它客户端(连接到其它节点的)接收到发布的消息。
如第3节开头所述,这里调用函数clusterPropagatePublish
,实际上调用的是函数clusterSendPublish
。它把发布的消息包装成clusterMsg
,广播给集群所有节点。其代码如下:
void clusterPropagatePublish(robj *channel, robj *message) {
clusterSendPublish(NULL, channel, message);
}
void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
unsigned char buf[sizeof(clusterMsg)], *payload;
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
uint32_t channel_len, message_len;
channel = getDecodedObject(channel);
message = getDecodedObject(message);
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);
// 组装消息,类型是CLUSTERMSG_TYPE_PUBLISH
// 该消息会携带clusterMsgDataPublish结构
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
// 设置频道和消息的长度
hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
hdr->totlen = htonl(totlen);
if (totlen < sizeof(buf)) {
payload = buf;
} else {
payload = zmalloc(totlen);
memcpy(payload,hdr,sizeof(*hdr));
hdr = (clusterMsg*) payload;
}
// 设置频道和消息值
memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
message->ptr,sdslen(message->ptr));
// 第一个参数是NULL,所以是广播,把PUBLISH的消息广播给其它节点
if (link)
clusterSendMessage(link,payload,totlen);
else
clusterBroadcastMessage(payload,totlen);
decrRefCount(channel);
decrRefCount(message);
if (payload != buf) zfree(payload);
}
广播的消息格式如下:
typedef struct {
uint32_t channel_len; // 频道的值长度
uint32_t message_len; // 消息的长度
unsigned char bulk_data[8]; // 频道和消息值,两者拼接而成
} clusterMsgDataPublish;
其它节点接收到这个消息,会在clusterProcessPacket
函数处理这个消息,并把消息发送给订阅了该频道的客户端。下面截取片段:
int clusterProcessPacket(clusterLink *link) {
// ...
else if (type == CLUSTERMSG_TYPE_PUBLISH) {
robj *channel, *message;
uint32_t channel_len, message_len;
if (dictSize(server.pubsub_channels) ||
listLength(server.pubsub_patterns))
{
// 从消息中获取频道和消息
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject(
(char*)hdr->data.publish.msg.bulk_data,channel_len);
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
// 将消息发送给订阅该频道的客户端(连接到本节点的)
// 这里见3.1.,一样的函数
pubsubPublishMessage(channel,message);
decrRefCount(channel);
decrRefCount(message);
}
} // ...
// ...
}
4. 总结
总体而言,发布/订阅的实现是非常简单的:
-
订阅时,维护好”频道-客户端“的订阅表
-
发布时
-
查找订阅表,将消息发送给订阅表中记录的客户端
-
对于单主节:
-
发布的命令会传播到从节点
-
从节点会把消息发送给订阅该频道的客户端,这些客户端连接的是该从节点
-
-
对于集群:
-
-
发布的消息广播到集群所有节点
- 其它节点会把消息发送给订阅该频道的客户端,这些客户端连接的是该节点