Redis存储数据的时候,会尝试压缩数据结构的空间。
因而它实现了几个压缩集合的数据结构:
ziplist
:压缩列表(ziplist.h
,ziplist.c
)zipmap
:压缩字典(zipmap.h
,zipmap.c
)
它们都用一个线性的char[]
来存储列表/字典的数据结构,相比adlist
和dict
更加节省内存空间。
1. 压缩列表
1.1. 压缩列表整体构成
列表包含一个头、一个尾和列表节点构成。
头部包含下面几个字段:
zlbytes
:4字节,类型为uint32_t
,记录压缩列表占用的内存字节数zltail
:4字节,类型为uint32_t
,记录压缩列表尾节点到列表起始地址的偏移量zllen
:2字节,类型为uint16_t
,记录节点数量(若数量为UINT16_MAX
时,需要遍历整个列表才能算出节点数量)
尾部包含一个字段:
zlend
:1字节,类型为uint8_t
,是一个标记,值为0xFF
整体格式为,而且都以小端形式存储数据:
/*
* The general layout of the ziplist is as follows:
*
* <zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
*/
1.2. 列表节点构成
列表节点主要由3个字段构成:
-
prevlen
:表示前一个列表项长度,若小于254字节(0xFE
),占用1字节,否则占用5字节(且第一个字节被置为0xFE
,后面4个字节才表示实际长度) -
encoding
:编码制式,且包含了值长度信息,长度可为1字节、2字节或5字节。格式可参考下图。注意
encoding
可以包含值信息,不需要content
字段内容,即1111xxxx
情形,xxxx
可表示1~13(0、14、15不能用因为会有冲突/被其它地方使用),但访问时,xxxx
读到的整数要减去1,才是真正的我们需要的值。 -
content
:它可以存储二进制内容(如字符串),也可以存储整数(小端存储)
根据上面的信息,是可以构造出一个结构,以便于更方便的操作,这个结构是zlentry
:
1
2
3
4
5
6
7
8
9
typedef struct zlentry {
unsigned int prevrawlensize; //存储prevrawlen的长度
unsigned int prevrawlen; // 列表前1项的长度
unsigned int lensize; // len和encoding长度的大小(即上面3个字段的encoding字段)
unsigned int len; // 该项值长度大小
unsigned int headersize; // prevrawlensize + lensize
unsigned char encoding; // content编码形式,可以是ZIP_STR_*或ZIP_INT_*,即上面图上encoding的前8位
unsigned char *p; // 指向节点最开头的指针
} zlentry;
构造可在zipEntry(unsigned char *p, zlentry *e)
函数进行:
// p就是指向当前项最开始的指针
void zipEntry(unsigned char *p, zlentry *e) {
ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
e->headersize = e->prevrawlensize + e->lensize;
e->p = p;
}
1.3. 节点插入
节点可在任意位置插入,这里挑选函数ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen)
,第一个参数为列表,第二个参数是待插入节点的位置(要插到原来p
指向的项之前),s
和slen
就是待插入的内容和长度。
最后都会进入下面这个函数,下面有代码以及解释:
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789;
zlentry tail;
if (p[0] != ZIP_END) {
// 若p不指向列表尾,需要获取上p的lensize和len
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else {
// 若p指向列表尾,则需要插入尾部,尝试获取尾节点的len
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) {
prevlen = zipRawEntryLength(ptail);
}
}
// 这里检查s的内容是否能转成整数,若可以,将整数赋值到value中,并更新encoding值
if (zipTryEncoding(s,slen,&value,&encoding)) {
// 可以转成整数,获取整数占用的字节长度
reqlen = zipIntSize(encoding);
} else {
// 字符串情形,占用长度就是slen
reqlen = slen;
}
// 计算prevlen的长度(指1.2.节的三个字段的prevlen)
reqlen += zipStorePrevEntryLength(NULL,prevlen);
// 计算encoding的长度(指1.2.节的三个字段的encoding)
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
// 假如p不指向最后,这里p是新插入项的后一项,要确保p的prevlen能存储新节点的长度
// nextdiff = 4 => sizeof(newRecord) < 254 && sizeof(p->prevlen) == 4
// nextdiff = -4 => sizeof(newRecord) >= 254 && sizeof(p->prevlen) == 1
// nextdiff = 0 => 都匹配的情况
int forcelarge = 0;
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}
// 记录p和列表开头的距离,因为要realloc
offset = p-zl;
// realloc内存,并重新定位p的位置(根据之前存的offset)
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
p = zl+offset;
if (p[0] != ZIP_END) {
// 若不插入到尾部,则:
// 1. 需要将原来p(新项的后一项)之后的内容全部往后移动,距离是整个新项的长度
// 2. 调整原来p(新项的后一项)的prevlen的长度,即p的prevlensize
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
// 更新原来p(新项的后一项)的prevlen
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);
// 更新压缩列表的zltail字段
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
// 若在尾部,只要更新列表的zltail字段即可
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}
// 若nextdiff不等于0,说明原有p(新项的后一项)的长度发生改变
// 需要把原有p(新项的后一项)之后的所有项的prevlen进行调整(包括数值和长度)
// 即“级联更新”
if (nextdiff != 0) {
offset = p-zl;
// 级联更新,从原有p(新项的后一项)开始
// 这里也可能realloc(因为可能调整prevlen的长度),所以要记录offset
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}
// 往p处写入新项: prevlen, encoding以及content
p += zipStorePrevEntryLength(p,prevlen);
p += zipStoreEntryEncoding(p,encoding,slen);
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
// 更新列表的长度
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}
总体来说,细节比较复杂,但逻辑还是简单的:
- 给定插入位置,计算新项的
prevlen
,encoding
,并且尽可能将其保存成整数 - 将插入位置之后的内容向后移动,给新项开出空间
- 更新新插入项之后所有项的
prevlen
(数值+长度),即级联更新 - 写入新项,更新元数据
1.4. 级联更新
1.3.节说了,新项插入时,会改变其后一项的prevlen
,这个改变可能影响该字段在内存的长度,造成连锁反应,因此需要级联更新。
级联更新的函数是__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p)
,即p
指向的列表项之后的所有项的prevlen
都可能需要被更新(当然只需要遍历到不需要更新的项为止)。
代码解释如下:
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
size_t offset, noffset, extra;
unsigned char *np;
zlentry cur, next;
// 从p项开始遍历
while (p[0] != ZIP_END) {
zipEntry(p, &cur);
// 获取p项整个的长度,并计算存储这个长度所需要的字节数
rawlen = cur.headersize + cur.len;
rawlensize = zipStorePrevEntryLength(NULL,rawlen);
// 若p的下一项为空就退出
if (p[rawlen] == ZIP_END) break;
// 获取p的下一项
zipEntry(p+rawlen, &next);
// 所需要的字节数不变,那么后面的节点也不需要改变,直接返回
if (next.prevrawlen == rawlen) break;
if (next.prevrawlensize < rawlensize) {
// 这里,p的下一个节点prevlen字段存不下p的长度,需要扩容
// 这里做了:
// 1. 扩容(resize),涉及到内存申请,拷贝
// 2. 向后移动数据(因为prevlen字段本身长度变化了)
// 3. 更新p下一项的prevlen值
// 4. p指针指向后一项,准备下一步的级联升级
offset = p-zl;
extra = rawlensize-next.prevrawlensize;
zl = ziplistResize(zl,curlen+extra);
p = zl+offset;
np = p+rawlen;
noffset = np-zl;
if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}
memmove(np+rawlensize,
np+next.prevrawlensize,
curlen-noffset-next.prevrawlensize-1);
zipStorePrevEntryLength(np,rawlen);
/* Advance the cursor */
p += rawlen;
curlen += extra;
} else {
// 这里,p的下一个节点prevlen字段存的下p的长度,不需要扩容
// 只需要更新p下一项prevlen值即可
// 因为没更改p下一项的长度,所以直接返回,后面的节点不需要更新
if (next.prevrawlensize > rawlensize) {
zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
} else {
zipStorePrevEntryLength(p+rawlen,rawlen);
}
break;
}
}
return zl;
}
代码的逻辑还是很简单的,就是从p
开始,判断p
的下一项的prevlen
是否能保存p
的长度:
- 若不能,则扩容,更新
prevlen
,p
继续向后进行下一步的扩容(升级) - 若可以(或
p
是尾节点),必要时更新p
下一项的prevlen
,并直接返回
假若每次空间重分配的最坏复杂度是$O(N)$,那么级联更新的最坏复杂度是$O(N^2)$,不过达到最坏情况的概率很低:
- 节点长度需要有多个连续介于250~253字节
- 即使出现更新,更新的节点数量也不会很多
1.5. 节点查找
查找函数是ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip)
,即查找等于vstr
值的项,步长为skip + 1
步(即每次比较时,跳过skip
项):
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
int skipcnt = 0;
unsigned char vencoding = 0;
long long vll = 0;
while (p[0] != ZIP_END) {
unsigned int prevlensize, encoding, lensize, len;
unsigned char *q;
ZIP_DECODE_PREVLENSIZE(p, prevlensize);
ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
// 这里p指向某一项的开头
// q指向某一项content的开头
q = p + prevlensize + lensize;
if (skipcnt == 0) {
// 根据步长,这一步需要比较
// 分为整数和字符串的比较, 若一致则返回该项的指针
if (ZIP_IS_STR(encoding)) {
if (len == vlen && memcmp(q, vstr, vlen) == 0) {
return p;
}
} else {
if (vencoding == 0) {
if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
vencoding = UCHAR_MAX;
}
/* Must be non-zero by now */
assert(vencoding);
}
if (vencoding != UCHAR_MAX) {
long long ll = zipLoadInteger(q, encoding);
if (ll == vll) {
return p;
}
}
}
skipcnt = skip;
} else {
// 需要跳过
skipcnt--;
}
// 更新p,指向下一项的开头
p = q + len;
}
return NULL;
}
这里的逻辑很简单,就是从头(或从指定项)向后遍历列表,并返回第一个符合项。
因为根据列表项存储的定义,可以很容易的拿到某一项的长度,这样就可以向后遍历。
1.6. 节点删除
删除函数为ziplistDelete(unsigned char *zl, unsigned char **p)
,即删除指定的p
项。它最后会进入下面的方法,总体而言还是比较简单的,这里看下面的注释解释就行:
unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
unsigned int i, totlen, deleted = 0;
size_t offset;
int nextdiff = 0;
zlentry first, tail;
// 读取p指向的项,保存在first中
zipEntry(p, &first);
// p需要向后移到被删除元素的后一项
// 并统计删除元素个数和字节数
for (i = 0; p[0] != ZIP_END && i < num; i++) {
p += zipRawEntryLength(p);
deleted++;
}
totlen = p-first.p;
if (totlen > 0) {
if (p[0] != ZIP_END) {
// 1. 若删除的不是尾部,就需要开始向前移动内存
// 同时还要更新prevlen的值,这会造成级联更新
nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
p -= nextdiff;
zipStorePrevEntryLength(p,first.prevrawlen);
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);
zipEntry(p, &tail);
if (p[tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
memmove(first.p,p,
intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
} else {
// 2. 删除尾部就不需要移动内存
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe((first.p-zl)-first.prevrawlen);
}
// 调整大小,并更新元数据
offset = first.p-zl;
zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
ZIPLIST_INCR_LENGTH(zl,-deleted);
p = zl+offset;
// 这边就是上面1情形下的prevlen调整,这必然引起一次级联更新
if (nextdiff != 0)
zl = __ziplistCascadeUpdate(zl,p);
}
return zl;
}
1.7. 总结
- 压缩列表可作为节约内存的顺序线性的数据结构
- 压缩列表会被作为Redis列表的实现
- 压缩列表包含的节点可保存字符串(二进制)和整数,且尽量保存整数
- 对压缩列表进行增删,可能导致级联更新,它效率较低,但出现几率不高
2. 压缩字典
2.1. 压缩字典的构成
这里盗一张图,展示一下zipmap
的存储结构:
可见它也是一个线性数据结构。
它的各个字段解释如下:
zmlen
:1字节,uint_8
,表示键值对个数,上限253(数量大于或等于254时,只能通过遍历获得数量)key_len
与value_len
:1字节或5字节,它的编码形式和ziplist
列表项的prevlen
一样free
:1字节,uint_8
,value
可用的空闲字节数end
:1字节标记,值为0xFF
2.2. 压缩字典的创建
初始化只生成zmlen
和end
字段,其中zmlen
置为0。
2.3. 压缩字典的查找
由于它用线性方式存储,因此必须使用线性扫描。
实现在zipmapLookupRaw(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned int *totlen)
中:
static unsigned char *zipmapLookupRaw(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned int *totlen) {
unsigned char *p = zm+1, *k = NULL;
unsigned int l,llen;
// 线性一项一项往后找
while(*p != ZIPMAP_END) {
unsigned char free;
l = zipmapDecodeLength(p);
llen = zipmapEncodeLength(NULL,l);
if (key != NULL && k == NULL && l == klen && !memcmp(p+llen,key,l)) {
// 这里就是找到了
// 如果totlen为NULL
// 表示函数调用者不关心zipmap占用的字节数
// 此时直接返回p,否则先记录下p指针然后继续遍历
if (totlen != NULL) {
// 这里需要继续,因为需要得到整个zipmap的长度
k = p;
} else {
// 找到直接返回
return p;
}
}
// 这里需要向后遍历
p += llen+l;
l = zipmapDecodeLength(p);
p += zipmapEncodeLength(NULL,l);
free = p[0];
p += l+1+free; // 指向下一项
}
// 到这里遍历完整个zipmap,得到其占用的字节数
if (totlen != NULL) *totlen = (unsigned int)(p-zm)+1;
return k;
}
代码还是比较简单的,就是线性查找,这里不多叙述。
2.4. 压缩字典的插入与更新
插入和更新的实现在函数zipmapSet(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned char *val, unsigned int vlen, int *update)
中,具体解释如下所述:
unsigned char *zipmapSet(unsigned char *zm, unsigned char *key, unsigned int klen, unsigned char *val, unsigned int vlen, int *update) {
unsigned int zmlen, offset;
unsigned int freelen, reqlen = zipmapRequiredLength(klen,vlen);
unsigned int empty, vempty;
unsigned char *p;
freelen = reqlen;
if (update) *update = 0;
// 首先查找是否key已存在
p = zipmapLookupRaw(zm,key,klen,&zmlen);
if (p == NULL) {
// 若不存在,则需要将新KV直接插入到后面
// 需要扩大内存,并更新zmlen
zm = zipmapResize(zm, zmlen+reqlen);
p = zm+zmlen-1;
zmlen = zmlen+reqlen;
if (zm[0] < ZIPMAP_BIGLEN) zm[0]++;
} else {
// 若存在,则需要判断已有KV空间是否能容纳新KV
if (update) *update = 1;
freelen = zipmapRawEntryLength(p);
if (freelen < reqlen) {
// 这里是不能容纳的情况
offset = p-zm;
// 先重新分配内存
zm = zipmapResize(zm, zmlen-freelen+reqlen);
p = zm+offset;
// 然后向后移动旧KV之后的内容,保证空间能容纳新KV
memmove(p+reqlen, p+freelen, zmlen-(offset+freelen+1));
zmlen = zmlen-freelen+reqlen;
freelen = reqlen;
}
}
// 这里freelen是原KV的空间大小
// reqlen是新KV的空间大小
// empty就是free值了(value未使用的大小)
empty = freelen-reqlen;
if (empty >= ZIPMAP_VALUE_MAX_FREE) {
// 若这个empty大于4,则需要调整
// 把这些空闲空间全部拿掉(即置free=0)
// 即把后面的内容往前移动,并重新分配内存
offset = p-zm;
memmove(p+reqlen, p+freelen, zmlen-(offset+freelen+1));
zmlen -= empty;
zm = zipmapResize(zm, zmlen);
p = zm+offset;
vempty = 0;
} else {
vempty = empty;
}
// 最后将新KV写入指定为止
p += zipmapEncodeLength(p,klen);
memcpy(p,key,klen);
p += klen;
p += zipmapEncodeLength(p,vlen);
*p++ = vempty;
memcpy(p,val,vlen);
return zm;
}
这里主要是对于空闲空间的处理(value
字段),若这个值过大(这里是4),zipmap
会调整大小,将空闲空间全部拿掉。
2.5. 压缩字典的删除
删除还是很简单的,就是:
- 先查找定位键值对
- 移动内存,覆盖掉待删除的键值对
- 重新分配内存(缩小内存分配)
- 更新元数据
代码如下:
unsigned char *zipmapDel(unsigned char *zm, unsigned char *key, unsigned int klen, int *deleted) {
unsigned int zmlen, freelen;
// 1. 查找定位
unsigned char *p = zipmapLookupRaw(zm,key,klen,&zmlen);
if (p) {
// 若找到
freelen = zipmapRawEntryLength(p);
// 2. 移动内存,覆盖待删除的内容
memmove(p, p+freelen, zmlen-((p-zm)+freelen+1));
// 3. 重新分配(缩小内存分配)
zm = zipmapResize(zm, zmlen-freelen);
// 4. 更新zmlen元数据
if (zm[0] < ZIPMAP_BIGLEN) zm[0]--;
if (deleted) *deleted = 1;
} else {
if (deleted) *deleted = 0;
}
return zm;
}
2.6. 总结
zipmap
使用很简单的构造格式构造了非常简单的KV数据结构,非常节省内存。
但是它不适合存大量的KV数据,且不适合写入多的场景,因为每次增删改往往会造成一次内存重新分配。