源码阅读-Redis单机服务器: 事件

Posted by keys961 on October 14, 2019

1. Overview

Redis服务器是事件驱动的,而事件分为两类事件:

  • 文件事件:使用多路复用器监听并驱动事件的执行
  • 时间事件:定时操作的一个抽象,如serverCron函数

本文将会分析这两个事件,以及它们的调度。

并且,可以对比Netty的EventLoop,它们之间有很多相似之处。

关于事件的相关代码,主要集中于ae.hae.c中。

2. 事件循环

Redis使用Reactor事件驱动处理文件事件:

  • 使用I/O多路复用监听特定事件是否就绪
  • 就绪后产生事件,调用关联好的函数处理事件

实际上,它和Netty的EventLoop非常类似(因此也被叫成事件循环),由于Redis是单进程模型,因此,相当于只有一个线程/EventLoopEventLoopGroup

  • 先利用多路复用监听文件事件(Netty的I/O事件),排队并逐个处理(因此不会有线程安全问题)
  • 然后处理时间事件(Netty的非I/O事件)

当然Redis没有可配置的I/O与非I/O事件的时间比例分配的功能,等效于Netty的100% I/O事件处理时间,因为Redis是I/O密集型程序。

另外,和Netty Server对比:

  • Netty是Reactor多线程模型,有一个boss和多个workerboss负责移交请求到worker(创建新的pipeline,绑定到worker上,并触发和传播事件)
  • Redis是Reactor单线程模型,只有一个boss,请求全部由boss负责

2.1. 事件循环定义

Redis的事件处理器是定义在ae.haeEventLoop

typedef struct aeEventLoop {
    int maxfd;   // 最大fd的值
    int setsize; // 最大追踪的fd个数
    long long timeEventNextId; //下一个时间事件的ID,用于生成时间事件的唯一标识
    time_t lastTime;     // 上一次执行事件循环的时间
    aeFileEvent *events;  // 文件事件数组
    aeFiredEvent *fired;  // 已触发的文件事件数组
    aeTimeEvent *timeEventHead; // 时间事件链表,注意是链表
    int stop; // 停止标识
    void *apidata; // 多路复用轮询时的需要的数据(如epoll的epfd和epoll_events)
    aeBeforeSleepProc *beforesleep; //在调用processEvent前,调用该处理函数
    aeBeforeSleepProc *aftersleep; // 在多路复用轮询后,执行文件之间前,若配置AE_CALL_AFTER_SLEEP则调用该函数
} aeEventLoop; 

启动事件循环是在server.cmain函数调用aeMain(aeEventLoop *eventLoop)后启动代表Redis服务正式启动了,而这个eventloop提供并处理了Redis几乎所有的服务

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 若不停止,则无限循环,处理事件(文件,时间等等),以对内对外提供服务
    while (!eventLoop->stop) {
        // 每次事件循环
        // 1. 有必要,执行beforesleep函数
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 2. 执行各种事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

2.2. 事件处理

而处理事件的函数和Netty的EventLoop大同小异,看起来没啥不一样的:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        // .. tvp calc omitted ...
        // 1. 从多路复用器中poll出来文件事件
        // 它会把就绪事件fd保存到eventLoop的fired数组中
        numevents = aeApiPoll(eventLoop, tvp);
        // 2. 若配置AE_CALL_AFTER_SLEEP和aftersleep,执行它
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);
		// 3. 处理文件事件
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; 
            // 判断是否是先写后读
            int invert = fe->mask & AE_BARRIER;
            // 执行读事件回调
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
            // 执行写事件回调
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            // 一般情况下先读后写
            // 而若注册的文件事件配置了AE_BARRIER则先写后读
            // 下面就是处理这种的情况
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            processed++;
        }
    }
    // 4. 处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    return processed; /* return the number of processed file/time events */
}

a) 文件事件处理

文件事件的处理在上面的代码块中已经写的很清楚了:

  • 先轮询多路复用器,获取就绪的文件事件
  • 一个一个执行这些事件
    • 一般情况下:先读后写
    • 若事件配置了AE_BARRIER:先写后读

关于AR_BARRIER可看下面的代码解释:

#define AE_BARRIER 4    /* With WRITABLE, never fire the event if the
                           READABLE event already fired in the same event
                           loop iteration. Useful when you want to persist
                           things to disk before sending replies, and want
                           to do that in a group fashion. */

b) 时间事件处理

对于时间事件的处理,也是非常粗暴的扫描时间事件的链表($O(n)$),一个一个执行的。这里可以看到,时间事件有一次性事件和重复事件:

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);
    // 1. 调整记录时间(因为墙上时钟不单调)
    // ...
    // 2. 从时间事件链表头开始扫描,并执行各自的回调
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;
        // 若事件ID贴上了AE_DELETED_EVENT_ID,说明该事件需要移除
        // 则执行移除事件操作
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }
        // 不合法的事件,跳过
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        // 当现在的事件大于时间事件附带的规定时间(即scheduled time)
        // 执行这个时间事件
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            // 执行事件回调
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            if (retval != AE_NOMORE) {
                // 若这个事件是重复事件,需要再次执行
                // 就更新这个事件附带的规定事件,即scheduled time
                // 后面的事件循环时,若当前时间超过这个scheduled time,则再次执行它
                
                // 注意,下一次事件触发的间隔决定于该事件的返回值,即retval
                // 比如serverCron返回1000/server.hz
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                // 否则就给这个事件ID贴上AE_DELETED_EVENT_ID
                // 下一次事件循环时,这个时间事件就会被移除
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

c) 调度总结

可以见下图,非常清楚:

ae_loop

3. 文件事件

文件事件定义在aeFileEvent中:

typedef struct aeFileEvent {
    int mask; // 事件类型,可以是AE_(READABLE|WRITABLE|BARRIER)
    aeFileProc *rfileProc; // 读回调函数
    aeFileProc *wfileProc; // 写回调函数
    void *clientData; // 附带数据
} aeFileEvent;

3.1. 文件事件的添加

添加文件事件很简单:

  • 先往多路复用器注册事件
  • 然后记录到aeEventLoop
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
	// 1. 往多路复用器添加该fd相关的文件事件
    // fd的文件事件记录在eventLoop->events[fd]上
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    // 2. 设置回调等参数
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

Redis支持多种多路复用器,优先级从高到低为evport, epoll, kqueue, select。Linux上一般使用epoll,所以后面都以epoll为准。

往多路复用器添加文件事件也是比较简单的,即调用aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask),内部就是epoll_ctl

typedef struct aeApiState {
    int epfd; // epoll的fd
    struct epoll_event *events; // epoll事件数组,events[fd]即注册到epfd上的fd的事件
} aeApiState;

struct epoll_event {
	__uint32_t events; // 需要监听的事件
	epoll_data_t data; 
};

typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; 
    // 若是新的文件事件,动作设置为EPOLL_CTL_ADD,代表注册一个新fd到epfd中
    // 否则是EPOLL_CTL_MOD,代表修改epfd中的已注册的fd
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask;
    // 设置需要监听的事件
    if (mask & AE_READABLE) ee.events |= EPOLLIN; // 若事件可读,要监听EPOLLIN,当fd上数据可读,事件被触发
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 若事件可写,要监听EPOLLOUT,当fd上数据可写,事件被触发
    // 设置需要监听的fd
    ee.data.fd = fd; 
    // 这里将fd注册到epfd上,并监听设置好的事件
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

3.2. 监听就绪的事件

监听并获取就绪事件也是很简单的,调用aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp),内部实际上调用epoll_wait,然后把触发的事件记录到fired事件数组中:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
	// 这里监听事件,等待给定的一段时间
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 若有事件
    if (retval > 0) {
        int j;
        numevents = retval;
        // 遍历已触发的事件
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE; // 错误和挂断都是AE_WRITABLE
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            // 将触发的事件保存到fired事件数组中,供之后回调函数的调用
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

3.3. 处理就绪的文件事件

之后怎么处理这些事件,就是回调函数的事情了,2.2.节以及有文件事件的处理代码,这里不再说明。

3.4. Redis注册的文件事件

a) 连接应答

这是一个AE_READABLE事件,当客户端创建新连接后,事件触发。

回调函数是networking.c/acceptTcpHandler,对新连接进行应答。

b) 命令请求处理

这也是一个AE_READABLE事件,客户端发送命令,服务端可以读取时,事件触发。

回调函数是networking.c/readQueryFromClient,它会读取客户端发送的请求数据。

连接应答之后,这个文件事件就会被创建且被监听。

c) 命令响应

这是一个AE_WRITABLE事件,用于响应命令处理的结果。

回调函数是networking.c/sendReplyToClient,它将响应返回给客户端。

客户端的响应先保存到一个缓冲区中,而每次事件循环进入之前,该写事件都会被添加到多路复用器中。因此在事件循环内,一旦fd可写就绪,就把缓冲区输出给客户端,响应就会被返回。

4. 时间事件

时间事件定义在aeTimeEvent中:

typedef struct aeTimeEvent {
    long long id; // 事件ID
    long when_sec; // 下次执行该任务的时间(s)
    long when_ms; // 下次执行该任务的时间(ms)
    aeTimeProc *timeProc; // 回调函数
    aeEventFinalizerProc *finalizerProc; // 事件的析构函数
    void *clientData; // 
    struct aeTimeEvent *prev; // 前一个事件
    struct aeTimeEvent *next; // 后一个事件
} aeTimeEvent;

4.1. 时间事件的添加

添加也是很容易的事情,就往整个时间事件的链表头添加一个事件即可。注意它的ID是递增的,和很多数据库一样:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    // 递增ID
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    // 设置下次执行的时间为之后milliseconds毫秒
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    // 把事件添加到链表头
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

4.2. 时间事件的处理

2.2.b中已经讲的很清楚了(简单遍历链表,判断时间,执行,标记移除),这里不再说明。

4.3. serverCron

serverCron之前也遇到过,它是一个时间事件,用于定期维护自身资源和状态,保证运行的稳定。

它需要做的内容有:

  • 更新统计信息
  • 清理过期键值对
  • 增量的rehash
  • 关闭、清理失效客户端
  • AOF(刷AOF缓冲、AOF重写)和RDB(save配置的BGSAVE)的持久化
  • 同步主从数据
  • 集群连接测试

它在初始化服务器的时候被添加进去:

void initServer(void) {
    // ...
    // 这里初始化的时候,下一次执行是1ms之后
    // 但是该函数的返回值是1000/server.hz
    // 因此它的执行周期是1000/server.hz ms, 默认100ms
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    // ...
}

实际上serverCron是Redis全局唯一的一个周期事件,可以看成一个大任务,它的间隔是1000/server.hz毫秒(默认100毫秒)。

而里面有一些小任务,它们可能需要更长的时间间隔,那么就可以调用下面这个宏,指定一个毫秒数作为时间间隔:

// server.cronloops记录了执行这个大任务serverCron循环的次数
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

比如下面的例子,设置了每5000ms执行一次任务:

run_with_period(5000) {
    // do tasks...
}

5. 总结

实际上,Redis的事件驱动处理和Netty非常类似(我可以说基本完全一致),只是Redis完全是单线程的。

这里把Netty的EventLoop源码分析贴出来,可以看出它们:

  • 都用多路复用监听并处理触发的文件(I/O)事件

    Netty用一个ChannelPipeline上的一串ChannelHandler处理事件

  • 处理完文件(I/O)事件后,再处理时间(非I/O)事件

  • 整个流程在一个事件循环中

可以看出,虽然Redis使用的C语言没有Java的OO特性,但是利用函数指针和宏,在多路复用模块上实现多态,组件可以实现替换,代码也是很优雅的。