源码阅读-Redis数据结构: 压缩数据结构

Posted by keys961 on September 9, 2019

Redis存储数据的时候,会尝试压缩数据结构的空间。

因而它实现了几个压缩集合的数据结构:

  • ziplist:压缩列表(ziplist.h, ziplist.c
  • zipmap:压缩字典(zipmap.h,zipmap.c

它们都用一个线性的char[]来存储列表/字典的数据结构,相比adlistdict更加节省内存空间。

1. 压缩列表

1.1. 压缩列表整体构成

列表包含一个头、一个尾和列表节点构成。

头部包含下面几个字段:

  • zlbytes:4字节,类型为uint32_t,记录压缩列表占用的内存字节数
  • zltail:4字节,类型为uint32_t,记录压缩列表尾节点到列表起始地址的偏移量
  • zllen:2字节,类型为uint16_t,记录节点数量(若数量为UINT16_MAX时,需要遍历整个列表才能算出节点数量)

尾部包含一个字段:

  • zlend:1字节,类型为uint8_t,是一个标记,值为0xFF

整体格式为,而且都以小端形式存储数据:

/*
* The general layout of the ziplist is as follows:
*
* <zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
*/

1.2. 列表节点构成

列表节点主要由3个字段构成:

  • prevlen:表示前一个列表项长度,若小于254字节(0xFE),占用1字节,否则占用5字节(且第一个字节被置为0xFE,后面4个字节才表示实际长度)

  • encoding:编码制式,且包含了值长度信息,长度可为1字节、2字节或5字节。格式可参考下图。

    注意encoding可以包含值信息,不需要content字段内容,即1111xxxx情形,xxxx可表示1~13(0、14、15不能用因为会有冲突/被其它地方使用),但访问时,xxxx读到的整数要减去1,才是真正的我们需要的值。

    encoding

  • content:它可以存储二进制内容(如字符串),也可以存储整数(小端存储)

根据上面的信息,是可以构造出一个结构,以便于更方便的操作,这个结构是zlentry

1
2
3
4
5
6
7
8
9
typedef struct zlentry {
    unsigned int prevrawlensize; //存储prevrawlen的长度
    unsigned int prevrawlen; // 列表前1项的长度
    unsigned int lensize; // len和encoding长度的大小(即上面3个字段的encoding字段)
    unsigned int len; // 该项值长度大小
    unsigned int headersize; // prevrawlensize + lensize
    unsigned char encoding; // content编码形式,可以是ZIP_STR_*或ZIP_INT_*,即上面图上encoding的前8位
    unsigned char *p; // 指向节点最开头的指针
} zlentry;

构造可在zipEntry(unsigned char *p, zlentry *e)函数进行:

// p就是指向当前项最开始的指针
void zipEntry(unsigned char *p, zlentry *e) {
    ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
    ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
    e->headersize = e->prevrawlensize + e->lensize;
    e->p = p;
}

1.3. 节点插入

节点可在任意位置插入,这里挑选函数ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen),第一个参数为列表,第二个参数是待插入节点的位置(要插到原来p指向的项之前),sslen就是待插入的内容和长度。

最后都会进入下面这个函数,下面有代码以及解释:

unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789;
    zlentry tail;
    
    if (p[0] != ZIP_END) {
        // 若p不指向列表尾,需要获取上p的lensize和len
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        // 若p指向列表尾,则需要插入尾部,尝试获取尾节点的len
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }
    // 这里检查s的内容是否能转成整数,若可以,将整数赋值到value中,并更新encoding值
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        // 可以转成整数,获取整数占用的字节长度
        reqlen = zipIntSize(encoding);
    } else {
      	// 字符串情形,占用长度就是slen
        reqlen = slen;
    }   
    // 计算prevlen的长度(指1.2.节的三个字段的prevlen)
    reqlen += zipStorePrevEntryLength(NULL,prevlen);
    // 计算encoding的长度(指1.2.节的三个字段的encoding)
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
    // 假如p不指向最后,这里p是新插入项的后一项,要确保p的prevlen能存储新节点的长度
    // nextdiff = 4 => sizeof(newRecord) < 254 && sizeof(p->prevlen) == 4
    // nextdiff = -4 => sizeof(newRecord) >= 254 && sizeof(p->prevlen) == 1
    // nextdiff = 0 => 都匹配的情况
    int forcelarge = 0;
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    if (nextdiff == -4 && reqlen < 4) {
        nextdiff = 0;
        forcelarge = 1;
    }
    // 记录p和列表开头的距离,因为要realloc
    offset = p-zl;
    // realloc内存,并重新定位p的位置(根据之前存的offset)
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    if (p[0] != ZIP_END) {
        // 若不插入到尾部,则:
        // 1. 需要将原来p(新项的后一项)之后的内容全部往后移动,距离是整个新项的长度
        // 2. 调整原来p(新项的后一项)的prevlen的长度,即p的prevlensize
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
        // 更新原来p(新项的后一项)的prevlen 
        if (forcelarge)
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            zipStorePrevEntryLength(p+reqlen,reqlen);
        // 更新压缩列表的zltail字段
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
        zipEntry(p+reqlen, &tail);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        // 若在尾部,只要更新列表的zltail字段即可
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }
    // 若nextdiff不等于0,说明原有p(新项的后一项)的长度发生改变
    // 需要把原有p(新项的后一项)之后的所有项的prevlen进行调整(包括数值和长度)
    // 即“级联更新”
    if (nextdiff != 0) {
        offset = p-zl;
        // 级联更新,从原有p(新项的后一项)开始
        // 这里也可能realloc(因为可能调整prevlen的长度),所以要记录offset
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }
    // 往p处写入新项: prevlen, encoding以及content
    p += zipStorePrevEntryLength(p,prevlen);
    p += zipStoreEntryEncoding(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    // 更新列表的长度
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

总体来说,细节比较复杂,但逻辑还是简单的:

  • 给定插入位置,计算新项的prevlen, encoding,并且尽可能将其保存成整数
  • 将插入位置之后的内容向后移动,给新项开出空间
  • 更新新插入项之后所有项的prevlen(数值+长度),即级联更新
  • 写入新项,更新元数据

1.4. 级联更新

1.3.节说了,新项插入时,会改变其后一项的prevlen这个改变可能影响该字段在内存的长度,造成连锁反应,因此需要级联更新。

级联更新的函数是__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p),即p指向的列表项之后的所有项的prevlen可能需要被更新(当然只需要遍历到不需要更新的项为止)。

代码解释如下:

unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;
	// 从p项开始遍历
    while (p[0] != ZIP_END) {
        zipEntry(p, &cur);
        // 获取p项整个的长度,并计算存储这个长度所需要的字节数
        rawlen = cur.headersize + cur.len;
        rawlensize = zipStorePrevEntryLength(NULL,rawlen);
        // 若p的下一项为空就退出 
        if (p[rawlen] == ZIP_END) break;
        // 获取p的下一项
        zipEntry(p+rawlen, &next);
        // 所需要的字节数不变,那么后面的节点也不需要改变,直接返回
        if (next.prevrawlen == rawlen) break;

        if (next.prevrawlensize < rawlensize) {
            // 这里,p的下一个节点prevlen字段存不下p的长度,需要扩容
            // 这里做了:
            // 1. 扩容(resize),涉及到内存申请,拷贝
            // 2. 向后移动数据(因为prevlen字段本身长度变化了)
            // 3. 更新p下一项的prevlen值
            // 4. p指针指向后一项,准备下一步的级联升级
            offset = p-zl;
            extra = rawlensize-next.prevrawlensize;
            zl = ziplistResize(zl,curlen+extra);
            p = zl+offset;
            np = p+rawlen;
            noffset = np-zl;
            if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
                ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
            }
            memmove(np+rawlensize,
                np+next.prevrawlensize,
                curlen-noffset-next.prevrawlensize-1);
            zipStorePrevEntryLength(np,rawlen);

            /* Advance the cursor */
            p += rawlen;
            curlen += extra;
        } else {
            // 这里,p的下一个节点prevlen字段存的下p的长度,不需要扩容
            // 只需要更新p下一项prevlen值即可
            // 因为没更改p下一项的长度,所以直接返回,后面的节点不需要更新
            if (next.prevrawlensize > rawlensize) {
                zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
            } else {
                zipStorePrevEntryLength(p+rawlen,rawlen);
            }
            break;
        }
    }
    return zl;
}

代码的逻辑还是很简单的,就是从p开始,判断p的下一项的prevlen是否能保存p的长度:

  • 若不能,则扩容,更新prevlenp继续向后进行下一步的扩容(升级)
  • 若可以(或p是尾节点),必要时更新p下一项的prevlen,并直接返回

假若每次空间重分配的最坏复杂度是$O(N)$,那么级联更新的最坏复杂度是$O(N^2)$,不过达到最坏情况的概率很低:

  • 节点长度需要有多个连续介于250~253字节
  • 即使出现更新,更新的节点数量也不会很多

1.5. 节点查找

查找函数是ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip),即查找等于vstr值的项,步长为skip + 1步(即每次比较时,跳过skip项):

unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
    int skipcnt = 0;
    unsigned char vencoding = 0;
    long long vll = 0;
    while (p[0] != ZIP_END) {
        unsigned int prevlensize, encoding, lensize, len;
        unsigned char *q;
        ZIP_DECODE_PREVLENSIZE(p, prevlensize);
        ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
        // 这里p指向某一项的开头
        // q指向某一项content的开头
        q = p + prevlensize + lensize;

        if (skipcnt == 0) {
   			// 根据步长,这一步需要比较
            // 分为整数和字符串的比较, 若一致则返回该项的指针
            if (ZIP_IS_STR(encoding)) {
                if (len == vlen && memcmp(q, vstr, vlen) == 0) {
                    return p;
                }
            } else {
                if (vencoding == 0) {
                    if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
                        vencoding = UCHAR_MAX;
                    }
                    /* Must be non-zero by now */
                    assert(vencoding);
                }
                if (vencoding != UCHAR_MAX) {
                    long long ll = zipLoadInteger(q, encoding);
                    if (ll == vll) {
                        return p;
                    }
                }
            }
            skipcnt = skip;
        } else {
            // 需要跳过
            skipcnt--;
        }
		// 更新p,指向下一项的开头
        p = q + len;
    }
    return NULL;
}

这里的逻辑很简单,就是从头(或从指定项)向后遍历列表,并返回第一个符合项。

因为根据列表项存储的定义,可以很容易的拿到某一项的长度,这样就可以向后遍历。

1.6. 节点删除

删除函数为ziplistDelete(unsigned char *zl, unsigned char **p),即删除指定的p项。它最后会进入下面的方法,总体而言还是比较简单的,这里看下面的注释解释就行:

unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;
	// 读取p指向的项,保存在first中
    zipEntry(p, &first);
    // p需要向后移到被删除元素的后一项
    // 并统计删除元素个数和字节数
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLength(p);
        deleted++;
    }
    totlen = p-first.p; 
    if (totlen > 0) {
        if (p[0] != ZIP_END) {
            // 1. 若删除的不是尾部,就需要开始向前移动内存
            // 同时还要更新prevlen的值,这会造成级联更新
            nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
            p -= nextdiff;
            zipStorePrevEntryLength(p,first.prevrawlen);
            ZIPLIST_TAIL_OFFSET(zl) = 
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);
            zipEntry(p, &tail);
            if (p[tail.headersize+tail.len] != ZIP_END) {
                ZIPLIST_TAIL_OFFSET(zl) =
                   intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
            }
            memmove(first.p,p,
                intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
        } else {
            // 2. 删除尾部就不需要移动内存
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe((first.p-zl)-first.prevrawlen);
        }

        // 调整大小,并更新元数据
        offset = first.p-zl;
        zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
        ZIPLIST_INCR_LENGTH(zl,-deleted);
        p = zl+offset;
		// 这边就是上面1情形下的prevlen调整,这必然引起一次级联更新
        if (nextdiff != 0)
            zl = __ziplistCascadeUpdate(zl,p);
    }
    return zl;
}

1.7. 总结

  • 压缩列表可作为节约内存的顺序线性的数据结构
  • 压缩列表会被作为Redis列表的实现
  • 压缩列表包含的节点可保存字符串(二进制)和整数,且尽量保存整数
  • 对压缩列表进行增删,可能导致级联更新,它效率较低,但出现几率不高

2. 压缩字典

2.1. 压缩字典的构成

这里盗一张图,展示一下zipmap的存储结构:

xx

可见它也是一个线性数据结构。

它的各个字段解释如下:

  • zmlen:1字节,uint_8,表示键值对个数,上限253(数量大于或等于254时,只能通过遍历获得数量)
  • key_lenvalue_len:1字节或5字节,它的编码形式和ziplist列表项的prevlen一样
  • free:1字节,uint_8value可用的空闲字节数
  • end:1字节标记,值为0xFF

2.2. 压缩字典的创建

初始化只生成zmlenend字段,其中zmlen置为0。

2.3. 压缩字典的查找

由于它用线性方式存储,因此必须使用线性扫描。

实现在zipmapLookupRaw(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned int *totlen)中:

static unsigned char *zipmapLookupRaw(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned int *totlen) {
    unsigned char *p = zm+1, *k = NULL;
    unsigned int l,llen;
	// 线性一项一项往后找
    while(*p != ZIPMAP_END) {
        unsigned char free;
        l = zipmapDecodeLength(p);
        llen = zipmapEncodeLength(NULL,l);
        if (key != NULL && k == NULL && l == klen && !memcmp(p+llen,key,l)) {
            // 这里就是找到了
            // 如果totlen为NULL
            // 表示函数调用者不关心zipmap占用的字节数
            // 此时直接返回p,否则先记录下p指针然后继续遍历
            if (totlen != NULL) {
                // 这里需要继续,因为需要得到整个zipmap的长度
                k = p;
            } else {
                // 找到直接返回
                return p;
            }
        }
        // 这里需要向后遍历
        p += llen+l;
        l = zipmapDecodeLength(p);
        p += zipmapEncodeLength(NULL,l);
        free = p[0];
        p += l+1+free; // 指向下一项
    }
    // 到这里遍历完整个zipmap,得到其占用的字节数
    if (totlen != NULL) *totlen = (unsigned int)(p-zm)+1;
    return k;
}

代码还是比较简单的,就是线性查找,这里不多叙述。

2.4. 压缩字典的插入与更新

插入和更新的实现在函数zipmapSet(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned char *val, unsigned int vlen, int *update)中,具体解释如下所述:

unsigned char *zipmapSet(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned char *val, unsigned int vlen, int *update) {
    unsigned int zmlen, offset;
    unsigned int freelen, reqlen = zipmapRequiredLength(klen,vlen);
    unsigned int empty, vempty;
    unsigned char *p;

    freelen = reqlen;
    if (update) *update = 0;
    // 首先查找是否key已存在
    p = zipmapLookupRaw(zm,key,klen,&zmlen);
    if (p == NULL) {
        // 若不存在,则需要将新KV直接插入到后面
        // 需要扩大内存,并更新zmlen
        zm = zipmapResize(zm, zmlen+reqlen);
        p = zm+zmlen-1;
        zmlen = zmlen+reqlen;
        if (zm[0] < ZIPMAP_BIGLEN) zm[0]++;
    } else {
        // 若存在,则需要判断已有KV空间是否能容纳新KV
        if (update) *update = 1;
        freelen = zipmapRawEntryLength(p);
        if (freelen < reqlen) {
            // 这里是不能容纳的情况
            offset = p-zm;
            // 先重新分配内存
            zm = zipmapResize(zm, zmlen-freelen+reqlen);
            p = zm+offset;
            // 然后向后移动旧KV之后的内容,保证空间能容纳新KV
            memmove(p+reqlen, p+freelen, zmlen-(offset+freelen+1));
            zmlen = zmlen-freelen+reqlen;
            freelen = reqlen;
        }
    }
    // 这里freelen是原KV的空间大小
    // reqlen是新KV的空间大小
    // empty就是free值了(value未使用的大小)
    empty = freelen-reqlen;
    if (empty >= ZIPMAP_VALUE_MAX_FREE) {
        // 若这个empty大于4,则需要调整
        // 把这些空闲空间全部拿掉(即置free=0)
        // 即把后面的内容往前移动,并重新分配内存
        offset = p-zm;
        memmove(p+reqlen, p+freelen, zmlen-(offset+freelen+1));
        zmlen -= empty;
        zm = zipmapResize(zm, zmlen);
        p = zm+offset;
        vempty = 0;
    } else {
        vempty = empty;
    }
  	// 最后将新KV写入指定为止
    p += zipmapEncodeLength(p,klen);
    memcpy(p,key,klen);
    p += klen;
    p += zipmapEncodeLength(p,vlen);
    *p++ = vempty;
    memcpy(p,val,vlen);
    return zm;
}

这里主要是对于空闲空间的处理(value字段),若这个值过大(这里是4),zipmap会调整大小,将空闲空间全部拿掉。

2.5. 压缩字典的删除

删除还是很简单的,就是:

  • 先查找定位键值对
  • 移动内存,覆盖掉待删除的键值对
  • 重新分配内存(缩小内存分配)
  • 更新元数据

代码如下:

unsigned char *zipmapDel(unsigned char *zm, unsigned char *key, unsigned int klen, int *deleted) {
    unsigned int zmlen, freelen;
    // 1. 查找定位
    unsigned char *p = zipmapLookupRaw(zm,key,klen,&zmlen);
    if (p) {
        // 若找到
        freelen = zipmapRawEntryLength(p);
        // 2. 移动内存,覆盖待删除的内容
        memmove(p, p+freelen, zmlen-((p-zm)+freelen+1));
        // 3. 重新分配(缩小内存分配)
        zm = zipmapResize(zm, zmlen-freelen);
		// 4. 更新zmlen元数据
        if (zm[0] < ZIPMAP_BIGLEN) zm[0]--;
        if (deleted) *deleted = 1;
    } else {
        if (deleted) *deleted = 0;
    }
    return zm;
}

2.6. 总结

zipmap使用很简单的构造格式构造了非常简单的KV数据结构,非常节省内存。

但是它不适合存大量的KV数据,且不适合写入多的场景,因为每次增删改往往会造成一次内存重新分配。