Redis 数据结构浅析

相关代码基于 Redis 版本 5.0.14 源码

RedisObject

RedisObject 结构体是在 server.h 文件中定义的,主要功能是用来保存键值对中的值。这个结构一共定义了 4 个元数据和一个指针。

  • type:redisObject 的数据类型,是应用程序在 Redis 中保存的数据类型,包括 String、List、Hash 等
  • encoding:redisObject 的编码类型,是 Redis 内部实现各种数据类型所用的数据结构。
  • lru:redisObject 的 LRU 时间。
  • refcount:redisObject 的引用计数。
  • ptr:指向值的指针。
/* Objects encoding. Some kind of objects like Strings and Hashes can be
 * internally represented in multiple ways. The 'encoding' field of the object
 * is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0     /* Raw representation */
#define OBJ_ENCODING_INT 1     /* Encoded as integer */
#define OBJ_ENCODING_HT 2      /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3  /* No longer used: old hash encoding. */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* No longer used: old list/hash/zset encoding. */
#define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8  /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of listpacks */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */
#define OBJ_ENCODING_LISTPACK 11 /* Encoded as a listpack */

#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */

...

struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
};

String

Redis 设计了简单动态字符串 (Simple Dynamic String,SDS) 的结构,用来表示字符串。相比于 C 语言中的字符串实现,SDS 这种字符串的实现方式,会提升字符串的操作效率,并且可以用来保存二进制数据。

SDS 结构设计

SDS 结构里包含了一个字符数组 buf,用来保存实际数据。同时,SDS 结构里还包含了三个元数据,分别是字符数组现有长度 len、分配给字符数组的空间长度 alloc,以及 SDS 类型 flags。同时 SDS 设计了不同的结构头(即不同类型),能灵活保存不同大小的字符串,从而有效节省内存空间。

typedef char *sds;

/* Note: sdshdr5 is never used, we just access the flags byte directly.
 * However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};

SDS 对比 Char*

  • C 语言中使用 char* 实现字符串的不足,主要是因为使用“\0”表示字符串结束,操作时需遍历字符串,效率不高,并且无法完整表示包含“\0”的数据,因而这就无法满足 Redis 的需求。
  • Redis 中字符串的设计思想与实现方法。Redis 专门设计了 SDS 数据结构,在字符数组的基础上,增加了字符数组长度和分配空间大小等元数据。这样一来,需要基于字符串长度进行的追加、复制、比较等操作,就可以直接读取元数据,效率也就提升了。而且,SDS 不通过字符串中的“\0”字符判断字符串结束,而是直接将其作为二进制数据处理,可以用来保存图片等二进制数据。
  • SDS 中是通过设计不同 SDS 类型来表示不同大小的字符串,并使用 __attribute__ ((__packed__)) 这个编程小技巧,来实现紧凑型内存布局,达到节省内存的目的。

另外 Redis 在操作 SDS 时,为了避免频繁操作字符串时,每次「申请、释放」内存的开销,还做了这些优化:

  • 内存预分配:SDS 扩容,会多申请一些内存(小于 1MB 翻倍扩容,大于 1MB 按 1MB 扩容)
  • 多余内存不释放:SDS 缩容,不释放多余的内存,下次使用可直接复用这些内存
#define SDS_MAX_PREALLOC (1024*1024)

sds sdsMakeRoomFor(sds s, size_t addlen) {
    ...
    /* Return ASAP if there is enough space left. */
    if (avail >= addlen) return s;

    len = sdslen(s);
    sh = (char*)s-sdsHdrSize(oldtype);
    reqlen = newlen = (len+addlen);
    if (newlen < SDS_MAX_PREALLOC)
        newlen *= 2;
    else
        newlen += SDS_MAX_PREALLOC;

    type = sdsReqType(newlen);
    ...
}

ZipList(List / Hashes / ZSet)

Redis 中的 ZipList 数据结构是特殊编码的双向链表,它以紧凑的顺序布局存储元素,无需为每个元素单独分配内存,从而减少了内存开销。ZipList 尤其适用于元素数量有限的小型集合,主要用于优化数据量较小的 List、Hash 和 Sorted Set。所有操作都是通过偏移量完成的

ZipList 结构
ZipList 结构

主要操作

查找操作

循环遍历直到 zlend,根据不同的 Encoding 去解析对应的 Entry 直到找到需要的数据。

/* Find pointer to the entry equal to the specified entry. Skip 'skip' entries
 * between every comparison. Returns NULL when the field could not be found. */
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
    ...
    while (p[0] != ZIP_END) {
        ...
        unsigned int prevlensize, encoding, lensize, len;
        unsigned char *q;

        ZIP_DECODE_PREVLENSIZE(p, prevlensize);
        ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
        q = p + prevlensize + lensize;
        
         if (skipcnt == 0) {
            /* Compare current entry with specified entry */
            if (ZIP_IS_STR(encoding)) {
                ...
            } else {
                /* Find out if the searched field can be encoded. Note that
                 * we do it only the first time, once done vencoding is set
                 * to non-zero and vll is set to the integer value. */
                if (vencoding == 0) {
                    if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
                        /* If the entry can't be encoded we set it to
                         * UCHAR_MAX so that we don't retry again the next
                         * time. */
                        vencoding = UCHAR_MAX;
                    }
                    /* Must be non-zero by now */
                    assert(vencoding);
                }

                /* Compare current entry with specified entry, do it only
                 * if vencoding != UCHAR_MAX because if there is no encoding
                 * possible for the field it can't be a valid integer. */
                if (vencoding != UCHAR_MAX) {
                    long long ll = zipLoadInteger(q, encoding);
                    if (ll == vll) {
                        return p;
                    }
                }
            }
            ...
        }
        ...
    }
}

插入操作

  1. 找到插入位置
  2. 尝试编码数据,计算所需空间
  3. 更新前后 Entry 的 prevlen 等数据(如果有)
  4. 扩容压缩表空间,申请内存空间并移动压缩表
  5. 写入新 Entry
// 具体代码参考下列函数
// https://github.com/redis/redis/blob/704ba5f5b22ae1ecafbcfb7a3258311c27ff94ff/src/ziplist.c#L755
/* Insert item at "p". */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen)

插入操作会由于 ZipList 对 prevlen 的编码不同,导致需要对插入位置后的元素的 prevlen 进行更新,使其等于插入条目的长度。可能出现的情况是,这个长度无法用 1 个字节编码,因此下一个条目需要增大一些,以容纳 5 字节编码的 prevlen。这会导致重新分配和 memmove。如果后续的 Entry 恰好在更新总长度后又超出了 ZIP_BIG_PREVLEN,就会发生级联扩容。插入位置后续节点向后移动。

删除操作

  1. 计算删除节点的大小
  2. 让删除节点的后面节点往前面覆盖
  3. 释放多余的空间
// 具体代码参考下列函数
// https://github.com/redis/redis/blob/704ba5f5b22ae1ecafbcfb7a3258311c27ff94ff/src/ziplist.c#L690
/* Delete "num" entries, starting at "p". Returns pointer to the ziplist. */
unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num)

删除操作也一样,可能会因为 prevlen 的编码问题引发级联更新。删除 指定 Entry 时,其后续的节点级联更新向前移动。

级联更新

/* When an entry is inserted, we need to set the prevlen field of the next
 * entry to equal the length of the inserted entry. It can occur that this
 * length cannot be encoded in 1 byte and the next entry needs to be grow
 * a bit larger to hold the 5-byte encoded prevlen. This can be done for free,
 * because this only happens when an entry is already being inserted (which
 * causes a realloc and memmove). However, encoding the prevlen may require
 * that this entry is grown as well. This effect may cascade throughout
 * the ziplist when there are consecutive entries with a size close to
 * ZIP_BIG_PREVLEN, so we need to check that the prevlen can be encoded in
 * every consecutive entry.
 *
 * Note that this effect can also happen in reverse, where the bytes required
 * to encode the prevlen field can shrink. This effect is deliberately ignored,
 * because it can cause a "flapping" effect where a chain prevlen fields is
 * first grown and then shrunk again after consecutive inserts. Rather, the
 * field is allowed to stay larger than necessary, because a large prevlen
 * field implies the ziplist is holding large entries anyway.
 *
 * The pointer "p" points to the first entry that does NOT need to be
 * updated, i.e. consecutive fields MAY need an update. */
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p)

根据情况将压缩表扩容,元素后移(插入操作)。或元素迁移,压缩表缩容(删除操作)。

优缺点

ZipList 设计的初衷就是「节省内存」,在存储数据时,把内存利用率发挥到了极致

  • 数字按整型编码存储,比直接当字符串存内存占用少
  • 数据长度字段,会根据内容的大小选择最小的长度编码
  • 甚至对于极小的数据,干脆把内容直接放到了长度字段中(前几个位表示长度,后几个位存数据)

但 ZipList 的劣势也很明显:

  • 寻找元素只能挨个遍历,存储过长数据,或存储的元素数量较多的情况下,查询性能很低。
  • 可能会导致级联更新的问题,即每个元素中保存了上一个元素的长度(为了方便反向遍历),这会导致上一个元素内容发生修改,长度超过了原来的编码长度,下一个元素的内容也要跟着变,重新分配内存,进而就有可能再次引起下一级的变化,一级级更新下去,频繁申请内存。

QuickList(List)

先前介绍 ZipList 的内容中,阐述了 ZipList 的一些问题。Redis 先是在 3.0 版本中设计实现了 QuickList。QuickList 结构在 ZipList 基础上,使用链表将 ZipList 串联起来,链表的每个元素就是一个 ZipList。这种设计减少了数据插入时内存空间的重新分配,以及内存数据的拷贝。同时,QuickList 限制了每个节点上 ZipList 的大小,一旦一个 ziplist 过大,就会采用新增 QuickList 节点的方法。QuickList 的设计,其实是结合了链表和 ZipList 各自的优势。简单来说,一个 QuickList 就是一个链表,而链表中的每个元素又是一个 ZipList。

QuickList 结构
QuickList 结构

设计与实现

/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
 * 'count' is the number of total entries.
 * 'len' is the number of quicklist nodes.
 * 'compress' is: -1 if compression disabled, otherwise it's the number
 *                of quicklistNodes to leave uncompressed at ends of quicklist.
 * 'fill' is the user-requested (or default) fill factor. */
typedef struct quicklist {
    quicklistNode *head;
    quicklistNode *tail;
    unsigned long count;        /* total count of all entries in all ziplists */
    unsigned long len;          /* number of quicklistNodes */
    int fill : 16;              /* fill factor for individual nodes */
    unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;

/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
 * We use bit fields keep the quicklistNode at 32 bytes.
 * count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
 * encoding: 2 bits, RAW=1, LZF=2.
 * container: 2 bits, NONE=1, ZIPLIST=2.
 * recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
 * attempted_compress: 1 bit, boolean, used for verifying during testing.
 * extra: 10 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
    struct quicklistNode *prev;
    struct quicklistNode *next;
    unsigned char *zl;           /* 指向的 ZipList
    unsigned int sz;             /* ziplist size in bytes */
    unsigned int count : 16;     /* count of items in ziplist */
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
    unsigned int container : 2;  /* NONE==1 or ZIPLIST==2 */
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;

QuickList采用了链表结构,所以当插入一个新的元素时,QuickList 首先就会检查插入位置的 ZipList 是否能容纳该元素,这是通过 _quicklistNodeAllowInsert 函数来完成判断的。

REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
                                           const int fill, const size_t sz) {
    if (unlikely(!node))
        return 0;

    int ziplist_overhead;
    /* size of previous offset */
    if (sz < 254)
        ziplist_overhead = 1;
    else
        ziplist_overhead = 5;

    /* size of forward offset */
    if (sz < 64)
        ziplist_overhead += 1;
    else if (likely(sz < 16384))
        ziplist_overhead += 2;
    else
        ziplist_overhead += 5;

    /* new_sz overestimates if 'sz' encodes to an integer type */
    unsigned int new_sz = node->sz + sz + ziplist_overhead;
    if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
        return 1;
    /* when we return 1 above we know that the limit is a size limit (which is
     * safe, see comments next to optimization_level and SIZE_SAFETY_LIMIT) */
    else if (!sizeMeetsSafetyLimit(new_sz))
        return 0;
    else if ((int)node->count < fill)
        return 1;
    else
        return 0;
}

这里还有个fill字段,它的含义是每个quicknode的节点最大容量,不同的数值有不同的含义,默认是-2,当然也可以配置为其他数值,具体数值含义如下:

  • -1: 每个quicklistNode节点的ziplist所占字节数不能超过4kb。(建议配置)
  • -2: 每个quicklistNode节点的ziplist所占字节数不能超过8kb。(默认配置&建议配置)
  • -3: 每个quicklistNode节点的ziplist所占字节数不能超过16kb。
  • -4: 每个quicklistNode节点的ziplist所占字节数不能超过32kb。
  • -5: 每个quicklistNode节点的ziplist所占字节数不能超过64kb。
  • 任意正数: 表示:ziplist结构所最多包含的entry个数,最大为215215。

因此函数会检查需要插入的元素大小在插入节点后,节点的大小是否符合相关的 optimization_level 所要求的大小。

常用操作

插入

// https://github.com/redis/redis/blob/704ba5f5b22ae1ecafbcfb7a3258311c27ff94ff/src/quicklist.c#L358
/* Insert a new entry before or after existing entry 'entry'.
 *
 * If after==1, the new value is inserted after 'entry', otherwise
 * the new value is inserted before 'entry'. */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
                                   void *value, const size_t sz, int after)
  1. 当前 QuickList 为空,则先初始化并创建第一个节点。
  2. 确定当前 QuickListNode 指向的 ZipList 是否有空间插入。如果是尾插,同时检查后继节点是否有空间插入。如果是头插,则同时检查前序节点是否有空间插入
  3. 如果当前节点未满,则根据要求从头部插入或尾部插入到指定位置。
  4. 如果当前节点满,需要尾插,但后继节点不为空且未满,则在后继节点进行头插。
  5. 如果当前节点满,需要头插,但前序节点不为空且未满,则在前序节点进行尾插。
  6. 如果当前节点满,并且前后节点都不为空且满的情况下,根据插入位置在合适位置创建新节点进行插入。
  7. 不满足上述情况,则需要把当前节点分裂成两个新节点,一般用于插入到当前节点 ZipList中间某个位置时。

删除

// https://github.com/redis/redis/blob/704ba5f5b22ae1ecafbcfb7a3258311c27ff94ff/src/quicklist.c#L625
/* Delete one entry from list given the node for the entry and a pointer
 * to the entry in the node.
 *
 * Note: quicklistDelIndex() *requires* uncompressed nodes because you
 *       already had to get *p from an uncompressed node somewhere.
 *
 * Returns 1 if the entire node was deleted, 0 if node still exists.
 * Also updates in/out param 'p' with the next offset in the ziplist. */
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
                                   unsigned char **p)

删除操作相对简单很多,直接从对应的 QuickListNode 指向的 ZipList 中删除对应的数据,如果删除后 ZipList 为空,则回收整个 QuickListNode。

ListPack(Stream)

QuickList 使用 QuickListNode 结构指向每个 ZipList,增加了内存开销。为了减少内存开销,并进一步避免 ZipList 连锁更新问题,Redis 在 5.0 版本中,就设计实现了 ListPack 结构。ListPack 结构沿用了 ziplist 紧凑型的内存布局,把每个元素都紧挨着放置。ListPack 中每个列表项不再包含前一项的长度了,因此当某个列表项中的数据发生变化,导致列表项长度变化时,其他列表项的长度是不会受影响的,因而这就避免了 ZipList 面临的连锁更新问题。

ListPack 结构
ListPack 结构

在 listpack 中,因为每个列表项只记录自己的长度,而不会像 ziplist 中的列表项那样,会记录前一项的长度。所以,当我们在 listpack 中新增或修改元素时,实际上只会涉及每个列表项自己的操作,而不会影响后续列表项的长度变化,这就避免了连锁更新。

设计与实现

ListPack 创建

/* Create a new, empty listpack.
 * On success the new listpack is returned, otherwise an error is returned. */
unsigned char *lpNew(void) {
    unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
    if (lp == NULL) return NULL;
    lpSetTotalBytes(lp,LP_HDR_SIZE+1);
    lpSetNumElements(lp,0);
    lp[LP_HDR_SIZE] = LP_EOF;
    return lp;
}

Entry BackLen 解析逻辑

Entry-len 最多包含 5 个字节,其中 entry-len 每个字节的最高位,是用来表示当前字节是否为 entry-len 的最后一个字节,这里存在两种情况,分别是:

  • 最高位为 1,表示 entry-len 还没有结束,当前字节的左边字节仍然表示 entry-len 的内容;
  • 最高位为 0,表示当前字节已经是 entry-len 最后一个字节了。

而 entry-len 每个字节的低 7 位,则记录了实际的长度信息。这里你需要注意的是,entry-len 每个字节的低 7 位采用了大端模式存储,也就是说,entry-len 的低位字节保存在内存高地址上。

/* Decode the backlen and returns it. If the encoding looks invalid (more than
 * 5 bytes are used), UINT64_MAX is returned to report the problem. */
uint64_t lpDecodeBacklen(unsigned char *p) {
    uint64_t val = 0;
    uint64_t shift = 0;
    do {
        val |= (uint64_t)(p[0] & 127) << shift;
        if (!(p[0] & 128)) break;
        shift += 7;
        p--;
        if (shift > 28) return UINT64_MAX;
    } while(1);
    return val;
}

有了特殊的 entry-len 编码格式,使得 ListPack 在 Entry 仅记录自身的长度的情况下,支持正向和反向遍历。

Hashes(Hashes / Set / ZSet)

Redis 为我们提供了一个经典的 Hash 表实现方案。针对哈希冲突,Redis 采用了链式哈希,在不扩容哈希表的前提下,将具有相同哈希值的数据链接起来,以便这些数据在表中仍然可以被查询到;对于 rehash 开销,Redis 实现了渐进式 rehash 设计,进而缓解了 rehash 操作带来的额外开销对系统的性能影响。

压缩表存储

#define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512
#define OBJ_HASH_MAX_ZIPLIST_VALUE 64

在哈希表元素数量小于 512 个且元素大小小于 64 字节时,哈希表会借助压缩表结构进行存储。Redis 会根据获取到的 RedisObject 的类型,通过不同的方法适配压缩表和 Dict 的操作以实现相同的逻辑

Dict 结构设计

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;
  1. Hash 表被定义为一个二维数组(dictEntry **table),这个数组的每个元素是一个指向哈希项(dictEntry)的指针,即数组里的每个元素是一个哈希桶。
  2. Redis 在每个 dictEntry 的结构设计中,除了包含指向键和值的指针,还包含了指向下一个哈希项的指针。(链地址法解决哈希冲突)。

渐进式 Rehash

Rehash 过程

  1. Redis 准备了两个哈希表,用于 rehash 时交替保存数据。
  2. 在正常服务请求阶段,所有的键值对写入哈希表 ht0
  3. 当进行 rehash 时,键值对被迁移到哈希表 ht1中。
  4. 迁移完成后,ht0的空间会被释放,并把 ht1的地址赋值给 ht0,ht1的表大小设置为 0。这样一来,又回到了正常服务请求的阶段,ht0接收和服务请求,ht1作为下一次 rehash 时的迁移表。

Rehash 触发条件

  1. 不处于 Rehash 过程中。
  2. ht0 承载的元素个数已经超过了 ht0 的大小,同时 Hash 表可以进行扩容。即负载因子大于等于 1 小于 5 且 dict_can_resize=1 时会触发扩容并 Rehash
  3. ht0 承载的元素个数,是 ht0 的大小的 dict_force_resize_ratio 倍,其中,dict_force_resize_ratio 的默认值是 5。即负载因子大于 5 时会触发扩容并 Rehash。
  4. ht0 承载的元素个数,是 ht0 的大小的 1/10 时。即负载因子小于 0.1 时会触发缩容并 Rehash。
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}
/* Hash table parameters */
#define HASHTABLE_MIN_FILL        10      /* Minimal hash table fill 10% */

int htNeedsResize(dict *dict) {
    long long size, used;

    size = dictSlots(dict);
    used = dictSize(dict);
    return (size > DICT_HT_INITIAL_SIZE &&
            (used*100/size < HASHTABLE_MIN_FILL));
}

dict_can_resize 变量控制,如果当前没有 RDB 子进程,并且也没有 AOF 子进程,可以触发 Rehash。即对应了 Redis 当前没有执行 RDB 快照和没有进行 AOF 重写的场景。

/* This function is called once a background process of some kind terminates,
 * as we want to avoid resizing the hash tables when there is a child in order
 * to play well with copy-on-write (otherwise when a resize happens lots of
 * memory pages are copied). The goal of this function is to update the ability
 * for dict.c to resize the hash tables accordingly to the fact we have o not
 * running childs. */
void updateDictResizePolicy(void) {
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
        dictEnableResize();
    else
        dictDisableResize();
}

Rehash 扩容按照当前哈希表容量的 2 的倍数进行扩容,但是实际是找到 DICT_HT_INITIAL_SIZE^2 大于期望大小的值,DICT_HT_INITIAL_SIZE 默认为 4

/* Expand or create the hash table */
int dictExpand(dict *d, unsigned long size)
{
    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size);

    /* Rehashing to the same table size is not useful. */
    if (realsize == d->ht[0].size) return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}
/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size)
{
    unsigned long i = DICT_HT_INITIAL_SIZE;

    if (size >= LONG_MAX) return LONG_MAX + 1LU;
    while(1) {
        if (i >= size)
            return i;
        i *= 2;
    }
}

Rehash 实现细节

/* Performs N steps of incremental rehashing. Returns 1 if there are still
 * keys to move from the old to the new hash table, otherwise 0 is returned.
 *
 * Note that a rehashing step consists in moving a bucket (that may have more
 * than one key as we use chaining) from the old to the new hash table, however
 * since part of the hash table may be composed of empty spaces, it is not
 * guaranteed that this function will rehash even a single bucket, since it
 * will visit at max N*10 empty buckets in total, otherwise the amount of
 * work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}
  1. 该函数会执行一个循环,根据要进行键拷贝的 bucket 数量 n,依次完成这些 bucket 内部所有键的迁移。当然,如果 ht0哈希表中的数据已经都迁移完成了,键拷贝的循环也会停止执行。
  2. 在完成了 n 个 bucket 拷贝后,dictRehash 函数的第二部分逻辑,就是判断 ht0表中数据是否都已迁移完。如果都迁移完了,那么 ht0的空间会被释放。因为 Redis 在处理请求时,代码逻辑中都是使用 ht0,所以当 rehash 执行完成后,虽然数据都在 ht1中了,但 Redis 仍然会把 ht1赋值给 ht0,以便其他部分的代码逻辑正常使用。
  3. 在 ht1赋值给 ht0后,它的大小就会被重置为 0,等待下一次 rehash。与此同时,全局哈希表中的 rehashidx 变量会被标为 -1,表示 rehash 结束。

其中 rehashidx 变量表示的是当前 rehash 在对哪个 bucket 做数据迁移。比如,当 rehashidx 等于 0 时,表示对 ht0中的第一个 bucket 进行数据迁移;当 rehashidx 等于 1 时,表示对 ht0中的第二个 bucket 进行数据迁移,以此类推。而 dictRehash 函数的主循环,首先会判断 rehashidx 指向的 bucket 是否为空,如果为空,那就将 rehashidx 的值加 1,检查下一个 bucket。同时,渐进式 rehash 在执行时设置了一个变量 empty_visits,用来表示已经检查过的空 bucket,当检查了一定数量的空 bucket 后,这一轮的 rehash 就停止执行,转而继续处理外来请求,避免了对 Redis 性能的影响。

SortedSet(ZSet)

有序集合(Sorted Set)是 Redis 中一种重要的数据类型,它本身是集合类型,同时也可以支持集合中的元素带有权重,并按权重排序。

压缩表存储

#define OBJ_ZSET_MAX_ZIPLIST_ENTRIES 128
#define OBJ_ZSET_MAX_ZIPLIST_VALUE 64

在 SortedSet 元素数量小于 128 个且元素大小小于 64 字节时,SortedSet 会借助压缩表结构进行存储。Redis 会根据获取到的 RedisObject 的类型,通过不同的方法适配压缩表和 ZSet 的操作以实现相同的逻辑

Sorted Set 基本结构

typedef struct zset {
    dict *dict;     // 哈希表索引
    zskiplist *zsl; // 跳表实现范围查
} zset;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;  // 头尾节点
    unsigned long length;                 // 链表长度
    int level;                            // 当前 level
} zskiplist;

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele;                          // Member
    double score;                     // Score
    struct zskiplistNode *backward;   // 后继指针
    
    // 节点的level数组,保存每层上的前向指针和跨度
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

从上述结构中可以看出,Sorted Set 能支持范围查询,这是因为它的核心数据结构设计采用了跳表,而它又能以常数复杂度获取元素权重,这是因为它同时采用了哈希表进行索引。

跳表的设计与实现

跳表其实是一种有序链表 + 多层索引的结构,用于优化范围查找。这种设计方法带来的好处是,当跳表从最高层开始进行查找时,由于每一层结点数都约是下一层结点数的一半,这种查找过程就类似于二分查找,查找复杂度可以降低到 O(logN)。

SkipList 结构
SkipList 结构

跳表结点层数设置

SkipList 的多层索引,采用「随机」的方式来构建,也就是说每次添加一个元素进来,要不要对这个元素建立「多层索引」?建立「几层索引」?都要通过「随机数」的方式来决定。每个元素在加入跳表的过程中都有一定概率在更高层创建索引。

#define ZSKIPLIST_MAXLEVEL 64  //最大层数为64
#define ZSKIPLIST_P 0.25       //随机数的值为0.25
int zslRandomLevel(void) {
    //初始化层为1
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

这个预设「概率」决定了一个跳表的内存占用和查询复杂度:概率设置越低,层数越少,元素指针越少,内存占用也就越少,但查询复杂会变高,反之亦然。这也是 skiplist 的一大特点,可通过控制概率,进而控制内存和查询效率。

跳表和平衡二叉树对比

关于 Redis 的 ZSet 为什么用 SkipList 而不用平衡二叉树实现的问题,原因如下:

  • SkipList 更省内存:25% 概率的随机层数,可通过公式计算出 SkipList 平均每个节点的指针数是 1.33 个,平衡二叉树每个节点指针是 2 个(左右子树)
  • SkipList 操作更友好:SkipList 找到大于目标元素后,向后遍历链表即可,平衡树需要通过中序遍历方式来完成,实现也略复杂。二叉平衡树还和涉及到重新平衡操作,比较复杂。
  • SkipList 更易实现和维护:扩展 SkipList 只需要改少量代码即可完成,平衡树维护起来更复杂。

插入节点

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    // 查找插入位置,会考虑 Score 和 SDS 字典序
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
     
    // 获取索引层数
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    
    // 插入节点,维护前后节点跨度
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

删除节点

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    // 修改每一层指针的指向
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}
/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 找到每一层要删除节点的前置节点
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    // 删除节点
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

哈希表结合跳表

  1. 哈希表存储 Member -> Score 的关系,获取 Score 复杂度为 O(1)
  2. 跳表根据 Score 按顺序存储 Member(Score + SDS 字典序),查找复杂度可以降低到 O(logN)。
  3. 但由于引入了哈希表,当进行插入删除操作的时候,会给操作带来额外的开销。例如:批量插入导致哈希表扩容 Rehash,批量范围删除导致哈希表的批量删除。

常见操作

插入

ZADD 命令

// https://github.com/redis/redis/blob/704ba5f5b22ae1ecafbcfb7a3258311c27ff94ff/src/t_zset.c#L1317
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore)

SortedSet 的插入操作会根据实际类型进行:

  • zsetAdd 函数会判定 Sorted Set 采用的是 ZipList 还是 SkipList 的编码方式。
  • 如果为 ZipList 对象:
    • 先查找对应元素,如果元素已存在:
      • NX 插入:直接返回
      • INCR 操作:增加 Score
      • Score 变更时候先删除元素后重新插入
    • 如果元素不存在,且非 XX 插入的情况下:
      • 检查是否超出 SortedSet 使用 ZipList 存储的限制条件,超出则转换为跳表模式,否则直接插入
  • 如果为 zset 对象:
    • 先查找对应元素(哈希表查找),如果元素已存在:
      • NX 插入:直接返回
      • INCR 操作:增加 Score
      • Score 变更时候先更新跳表,然后更新哈希表
    • 如果元素不存在,且非 XX 插入的情况下:
      • 复制 SDS
      • 将元素插入跳表
      • 将元素插入哈希表

删除

ZREM/ZREMRANGEBYLEX/ZREMRANGEBYRANK/ZREMRANGEBYSCORE

// https://github.com/redis/redis/blob/704ba5f5b22ae1ecafbcfb7a3258311c27ff94ff/src/t_zset.c#L1434
/* Delete the element 'ele' from the sorted set, returning 1 if the element
 * existed and was deleted, 0 otherwise (the element was not there). */
int zsetDel(robj *zobj, sds ele)
  • zsetDel 函数会判定 Sorted Set 采用的是 ZipList 还是 SkipList 的编码方式。
  • 如果为 ZipList 对象:
    • 先查找对应元素,如果元素已存在:
      • 从 ZipList 中删除对应 Entry
  • 如果为 zset 对象:
    • 先查找对应元素(哈希表查找),如果元素已存在:
      • 从哈希表中删除元素
      • 从跳表中删除元素
      • 检查哈希表时候需要缩容,如果需要则开始缩容。

压缩表跳表转换

/* Convert the sorted set object into a ziplist if it is not already a ziplist
 * and if the number of elements and the maximum element size and total elements size
 * are within the expected ranges. */
void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen, size_t totelelen) {
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return;
    zset *zset = zobj->ptr;

    if (zset->zsl->length <= server.zset_max_ziplist_entries &&
        maxelelen <= server.zset_max_ziplist_value &&
        ziplistSafeToAdd(NULL, totelelen))
    {
        zsetConvert(zobj,OBJ_ENCODING_ZIPLIST);
    }
}
void zsetConvert(robj *zobj, int encoding) {
    ...
    if (zobj->encoding == encoding) return;
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        ...
        zs = zmalloc(sizeof(*zs));
        zs->dict = dictCreate(&zsetDictType,NULL);
        zs->zsl = zslCreate();
        ...
        while (eptr != NULL) {
            score = zzlGetScore(sptr);
            serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
            if (vstr == NULL)
                ele = sdsfromlonglong(vlong);
            else
                ele = sdsnewlen((char*)vstr,vlen);
        
            node = zslInsert(zs->zsl,score,ele);
            serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
            zzlNext(zl,&eptr,&sptr);
        }
        
        zfree(zobj->ptr);
        zobj->ptr = zs;
        zobj->encoding = OBJ_ENCODING_SKIPLIST;
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        unsigned char *zl = ziplistNew();
        ...
        while (node) {
            zl = zzlInsertAt(zl,NULL,node->ele,node->score);
            next = node->level[0].forward;
            zslFreeNode(node);
            node = next;
        }
        
        zfree(zs);
        zobj->ptr = zl;
        zobj->encoding = OBJ_ENCODING_ZIPLIST;
        ...
    }

主要还是根据压缩表存储的条件进行相互转换,根据输入输出的 Encoding,都是将输入的结构插入到重新创建的输出结构中,再将输入结构清理并释放内存空间。

小结

以上就是对 Redis 几大数据结构的浅析,Redis 作为一个高性能的内存 KV 库,其数据结构设计和实现都是非常值得学习的。每种数据结构都有其特点和适用场景,Redis 也根据不同的场景选择了不同的数据结构来实现,以达到最优的性能。