博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一个线程安全的 lrucache 实现 --- 读 leveldb 源码
阅读量:6906 次
发布时间:2019-06-27

本文共 11432 字,大约阅读时间需要 38 分钟。

缓存是计算机的每一个层次中都是一个非常重要的概念,缓存的存在可以大大提高软件的运行速度。Least Recently Used(lru) cache 即最近最久未使用的缓存,多见与页面置换算法,lru 缓存算法在缓存的大小达到最大值之后,换出最早未被使用的缓存。

在阅读 leveldb 的源代码的时候,发现其中的 cache 类正是一个线程安全的 lru-cache 实现,代码非常优雅。笔者读完之后受益良多,希望借助这篇博客,一来可以加深自己的理解,而来可以与大家共享这些优秀代码(PS:因为这个 cache 类主要是为了支持 leveldb 的实现,所以其实现的接口可能与其他常见缓存有所差别)。

leveldb 中的 cache 提供的并不是一个类似于 std::map 的模板类,其有一下几个特点:

  1. 键值必须为 string-like (代码中为 Slice 类).
  2. 存储的内容为 void *.所以支持存储任何类型的数据,因为存储的是指针,所以 cache 类负责管理其所有 entries 的生命周期以及资源回收。在插入的时候必须明确传入 deleter.
  3. 对 entries 的查询需要先获取 Handle(像pthread_t一样的一个对客户端透明的结构,如果客户端不再使用,必4. 须要显示的调用Cache::Release(handle).不直接提供访问 entries 的接口,防止了客户端修改,但引入了一次额外的函数调用(Cache::Value(hanle)).
  4. 使用两个双向链表来分别存储被客户端使用的 entries 和未被使用的 entries.
  5. NewId() 接口可以生成一个唯一的 id,多线程环境下可以使用这个 id 与自己的键值拼接起来,防止不同线程之间互相覆写.
  6. 使用很直观的 mutex 类提供线程安全性.
不止巧妙,而且代码的可读性非常之高的,下面让我们来看一下主要的代码(完整代码可以参见 leveldb 源代码或者本人的 github)。
class Cache { public:  Cache() { }  // 需要调用插入时的 deleter 销毁所有 entries  virtual ~Cache();  // 每个 entry 对应的 handle,客户端只能通过调用 this->Value(handle) 来获取缓存的内容,不可以直接访问缓存  struct Handle { };  // 插入一个 key->value 的映射,支持通过 charge 为不同的缓存设定不同的权重。  // 返回一个 entry 对应的 handle 指针到客户端,客户端不需要使用后必须显示的调用 this->Release(handle),  // 当缓存要被换出的时候,会调用传入的 deleter  virtual Handle *Insert(const Slice &key, void *value, size_t charge,                         void (*deleter)(const Slice &key, void *value)) = 0;  // 查询接口,如果缓存不存在,返回 NULL 指针。否则返回 entry 对应的 handle 指针,同样必须调用 this->Release(handle)  virtual Handle *Lookup(const Slice &key) = 0;  // 释放由 Insert 或者 Lokkup 返回的 handle。Release a mapping returned by a previous Lookup().  // NOTE: 1.仅能在一个 handle 上调用一次 this->Release(handle)  // NOTE: 2.handle 在调用该方法后不能再使用(handle 指针会被 free 掉)  virtual void Release(Handle *handle) = 0;  // 获取缓存内容  virtual void *Value(Handle *hanle) = 0;  // 见上面第6点  virtual uint64_t NewId() = 0;  // 清楚所有的未使用缓存  virtual void Prune() { }  // 查看缓存使用总量  virtual size_t TotalCharge() const = 0; private:  // No copying allowed  Cache(const Cache &);  void operator=(const Cache &);};
因为 leveldb 中支持对定制自己的缓存类,所以其头文件中的 Cache 是一个虚拟基类,更像是一个接口定义,《effective c++》中称之为 Interface class。每个接口的含义我都已经加上了注释,为了保证除了 Cache 类,没有人可以操作缓存数据(这是非常重要的一点,因为 Cache 中存储的都是 void *,Cache 类必须要确保它自己唯一管理这些缓存对象生命周期),这里使用了 Handle(句柄)。虽然在查询的时候额外引入了一层间接层,但是可以充分保证数据安全。

下面让我们看一下实现代码:

struct LRUHandle { // 这便是返回的 Handle,在返回的时候会 return reinterpret_cast
(handle) void *value; void (*deleter)(const Slice &, void *); LRUHandle *next_hash; // 给自己实现的 hahs_table 使用; LRUHandle *next; // 双向链表 LRUHandle *prev; // 双向链表 size_t charge; // 权重 size_t key_length; // key 长度 bool in_cache; // 是否被缓存(我们的类可以通过将 capacity 设置为0来关闭缓存) uint32_t refs; // 引用计数 uint32_t hash; // 我们要将 hash 值缓存起来,防止需要重复 hash 计算 char key_data[1]; // 柔性数组,存储键 Slice key() const { assert(next != this); // Hold true iff this is the head of an empty list return Slice(key_data, key_length); } };
代码中 Slice 是 google 项目常见的一个工具类,其实现非常简单,可以认为它仅仅是对一个字符串的引用(因为它自己不管理内存,咱们在这里可以简单的把它当做 std::string 也是可以的,它可以使用的地方都可以使用 std::string(反过来不成立))。

可以看到我们这个 LRUHandle 类中的数据还是不少的,在后面的实现代码中我们能看到它们的用武之地。下面要看的代码是 levedb 自己实现的一个很短小精悍的 hash_table,其使用开链法解决冲突,代码真的很优雅,很值得一读:

class HandleTable { public:   HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }   ~HandleTable() { delete[] list_; }   LRUHandle *Lookup(const Slice &key, uint32_t hash) {     return *FindPointer(key, hash);   }   // Insert into the last position   LRUHandle *Insert(LRUHandle *h) {     LRUHandle **ptr = FindPointer(h->key(), h->hash);     LRUHandle *old = *ptr;     h->next_hash = (old == NULL ? NULL : old->next_hash);     *ptr = h;     if (old == NULL) {       ++elems_;       if (elems_ > length_) {         Resize();       }     }     return old;   }   LRUHandle *Remove(const Slice &key, uint32_t hash) {     LRUHandle **ptr = FindPointer(key, hash);     LRUHandle *result = *ptr;     if (result != NULL) {       *ptr = result->next_hash;       --elems_;     }     return result;   } private:   uint32_t length_; // hash table 的 bucket 数   uint32_t elems_; // hash table 中 entry 数   LRUHandle **list_; // hash bucket 数组   LRUHandle **FindPointer(const Slice &key, uint32_t hash) {      assert(list_ != NULL);     LRUHandle **ptr = &list_[hash & (length_ - 1)]; // 找到对应键的桶,取第一个 entry     while (*ptr != NULL &&            ((*ptr)->hash != hash || (*ptr)->key() != key)) {       ptr = &(*ptr)->next_hash; // 注意,我们返回的不是一个 LRUHandle *,而是一个 LRUHandle **(&(LRUHandle::next_hash))     }     return ptr;   }   void Resize() {     uint32_t new_length = 4; // length_ 一定是 2 的幂次,这样可以使用 hash & (length_ - 1) 来算出 hash % length_     while (new_length < elems_) new_length <<= 1; // 算出新 length,因为 length > elems_; 所以装载因子<1     LRUHandle **new_list = new LRUHandle *[new_length];      memset(new_list, 0, sizeof(new_list[0]) * new_length);     uint32_t count = 0;     for (uint32_t i = 0; i < length_; i++) { // 遍历所有的桶,为桶内元素搬家       LRUHandle *h = list_[i]; // 每个桶的第一个 entry       while (h != NULL) {         LRUHandle *next = h->next_hash; // 先保存起来 next         uint32_t hash = h->hash;         LRUHandle **ptr = &new_list[hash & (new_length-1)];  // 计算在新表中的桶下标,取桶内第一个 entry         h->next_hash = *ptr; // 头插法         *ptr = h;         h = next;         ++count;       }     }     assert(count == elems_);     delete[] list_; // 更新 list_ 和 length_     list_ = new_list;     length_ = new_length;   }};
上面的代码真的写的太好了,笔者都有怎么读都读不够的感觉啊,谷歌工程师,真的太强了。简要的说下最重要的两个接口:Resize() 和 FindPointer()。

先说 Resize()。在 hash_table 中扩容的时候,往往有两个常见的选择,一是 bucket 的数量选用一个素数,这样可以保证 hash mod 之后的结果最均匀,还有一种是使用2的幂次结果作为 bucket 数,后者是通过优化 hash % #bucket 这个运算来优化(因为 #bucket = 2^n,hash % #bucket 等价于 hash & (length - 1))。在 leveldb 中选用的是后者来优化。

然后是 FindPointer(),在读这份源码之前,我对链表内元素的插入和删除之后使用两个指针,一前一后,来操作链表,但是作者使用一个 LRUHandle **指针,一个指针就解决了对链表元素的插入和删除,真的太巧妙了。我觉得这个代码都值得单独拿出来讲,所以这里并不准备全部展开,但是推荐大家好好品读,我也会学习这个写法,自己写一个 list 实现,到时候也会发到博客,和大家一起交流。

有了 hash_table,我们可以开始实现了 LRUCache 了:

class LRUCache { public:  LRUCache();  ~LRUCache();  void SetCapacity(size_t cap) { capacity_ = cap; }  Cache::Handle *Insert(const Slice &key, uint32_t hash,                        void *value, size_t charge,                        void (*deleter)(const Slice &, void *));  Cache::Handle *Lookup(const Slice &key, uint32_t hash);  void Release(Cache::Handle *handle);  void Erase(const Slice &key, uint32_t hash);  void Prune();  size_t TotalCharge() const {    MutexLock l(&mutex_);    return usage_;  } private:  void LRU_Remove(LRUHandle *e);  void LRU_Append(LRUHandle *list, LRUHandle *e);  void Ref(LRUHandle *e);  void Unref(LRUHandle *e);  bool FinishErase(LRUHandle *e);  // Initialized before use.  size_t capacity_;  // Capacity usage guarded by mutex  mutable Mutex mutex_;  size_t usage_;  // 所有未被使用的 entries 都被存储于 lru_ 这个双向链表中,这里使用 dummy head 的方法实现双向链表  // lru_.prev 是最新的 entry,lru_.next 是最早的 entry(也就是说换出缓存的时候就从 lru_.next 开刀)  // lru_ 中的所有 entries 都有 refs == 1 && in_cache == true (仅被 LRUCache 类引用,而且被缓存)  LRUHandle lru_;  // 所有现在被使用(客户端持有至少一个尚未 Release() 的 handle)的 entries 存储在 in_use_ 双向链表中  // in_use_ 中 entries 有 refs >= 2 && in_cache = true  LRUHandle in_use_;  HandleTable table_; // 用于进行 key->handle 的 mapping};LRUCache::LRUCache() : usage_(0) { // 初始化两个双向链表的 dummy head  lru_.next = &lru_;   lru_.prev = &lru_;  in_use_.next = &in_use_;  in_use_.prev = &in_use_;}LRUCache::~LRUCache() {  assert(in_use_.next == &in_use_); // 如果在 LRUCache 在析构的时候,还有客户端持有尚未 Release() 的 handle,那么就会出问题  LRUHandle *e = lru_.next;  while (e != &lru_) { // 释放所有 entries    LRUHandle *next = e->next;    assert(e->in_cache);     e->in_cache = false;    assert(e->refs == 1);    Unref(e);    e = next;  }}void LRUCache::Ref(LRUHandle *e) { // 增加引用计数,必要的话将其从 lru_. 挪到 in_use_  if (e->refs == 1 && e->in_cache) { // ++ 后 refs 大于1,代表有客户端使用    LRU_Remove(e); // 从 lru_ 中删除    LRU_Append(&in_use_, e); // 插入 in_use_ 中  }  e->refs++; // 增加引用计数}void LRUCache::Unref(LRUHandle *e) {  assert(e->refs > 0); // Unref() 调用前,这个 handle 应该是被引用的  if (--e->refs == 0) { // 无人引用(包括 LRUCache 类),需要调用其构造时传入的删除器,管理资源    assert(!e->in_cache);    (*e->deleter)(e->key(), e->value);    free(e); // 注意所有的 handle 都是 heap-allocated,必须要free  } else if (e->in_cache && e->refs == 1) { // refs==1 代表仅有 LRUCache 类引用    LRU_Remove(e);  // 从 in_use_ 中删除    LRU_Append(&lru_, e); // 插入 lru_ 中  }}// 将元素从双向链表中删除(删除并不需要知道其所属的 list)void LRUCache::LRU_Remove(LRUHandle *e) {  e->prev->next = e->next;   e->next->prev = e->prev;}// 将元素插入到双向链表 head.prevvoid LRUCache::LRU_Append(LRUHandle *list, LRUHandle *e) {  list->prev->next = e;  e->prev = list->prev;  e->next = list;  list->prev = e;}Cache::Handle *LRUCache::Lookup(const Slice &key, uint32_t hash) {  MutexLock l(&mutex_);  LRUHandle *e = table_.Lookup(key, hash); // 在 hash_table 中查找  if (e != NULL) Ref(e); // 如果有被缓存起来,增加引用计数等  return reinterpret_cast
(e); // 将其 cast 为一个 Handle * 返回,还记得 Cache 类中的接口吧?}void LRUCache::Release(Cache::Handle *e) { MutexLock l(&mutex_); Unref(reinterpret_cast
(e));}Cache::Handle *LRUCache::Insert( const Slice &key, uint32_t hash, void *value, size_t charge, void (*deleter)(const Slice &, void *)) { MutexLock l(&mutex_); LRUHandle *e = reinterpret_cast
( malloc(sizeof(LRUHandle) + (key.size() - 1))); // 这里注意,因为 LRUHandle 中保存了 key_length,所以这里在申请内存的时候,并没有为 '\0'申请内存,如果在 LRUHandle::key_data 上调用 strlen() 会有想不到的后果 e->value = value; e->charge = charge; e->deleter = deleter; e->key_length = key.size(); e->in_cache = false; e->refs = 1; // 这里将引用计数设为1是代表被 this->Insert() 的调用者引用,并不是代表 LRUCache 引用,所以如果不保存 this->Insert() 的返回结果会有很大的问题 memcpy(e->key_data, key.data(), key.size()); if (capacity_ > 0) { // 需要缓存起来 e->refs++; // LRUCache 类也要引用这个 handle 了 e->in_cache = true; LRU_Append(&in_use_, e); // 因为这时候 refs == 2,需要放到 in_use_ 中 usage_ += charge; FinishErase(table_.Insert(e)); // 如果相同键插入两次,会覆盖之前的缓存,并且需要将之前的缓存删除 } else { // 如果将 capacity 设置为0,代表的是关闭缓存 // 如果 handle->next == handle,会无法通过 LRUHandle::key() 中的断言 e->next = NULL; } // 如果本次插入导致缓存超出上限,需要换出旧缓存 while (usage_ > capacity_) { LRUHandle *oldest = lru_.next; // 还记得 lru_.next 存的是最早未使用的缓存吧 assert(oldest->refs == 1); bool erased = FinishErase(table_.Remove(oldest->key(), oldest->hash)); // 删除调用缓存 if (!erased) { // 如果 erased 是 false,代表 lru_ 中保存有没有被缓存起来的 entry,这是一个严重的问题 assert(erased); // 一条一定会失败的断言 } } return reinterpret_cast
(e);}// 该接口仅应该在用与 FinishErase(handle_table_.Remove()) 一起使用,因为它做的是删除缓存后的后续工作bool LRUCache::FinishErase(LRUHandle *e) { if (e != NULL) { assert(e->in_cache); LRU_Remove(e); // 从 lru_ 中删除 e->in_cache = false; // 设置状态 usage_ -= e->charge; Unref(e); // 一定会进入调用删除器的分支 } return e != NULL;}void LRUCache::Erase(const Slice &key, uint32_t hash) { MutexLock l(&mutex_); FinishErase(table_.Remove(key, hash));}void LRUCache::Prune() { MutexLock l(&mutex_); while (lru_.next != &lru_) { // 遍历 lru_ 中保存的 entries,将其全部删除掉 LRUHandle *e = lru_.next; assert(e->refs == 1); bool erased = FinishErase(table_.Remove(e->key(), e->hash)); if (!erased) { assert(erased); } }}
代码的解释都在注释中,这里我们从设计角度看看,这个 LRUCache 是如何实现的。首先,数据结构上,使用的是双向链表+hash_table,LRU 中要做到可以迅速找到最早未使用的缓存,所以使用了两个双向链表,分别用于保存“在使用(in_use_)”和“未使用(lru_)”的缓存,这样,lru_.next 便指向了最早未使用缓存。而 hash_table 则用于对 key->value 进行检索。而对于线程安全的实现,则是选用直观的 mutex,mutex 不代表慢,大家应该要记住,慢的不是锁,而是锁的竞争。

至此一个线程安全的 LRUCache 类就已经全部实现,在 leveldb 中使用的缓存是 ShardedLRUCache,其实就是聚合多个 LRUCache 实例,真正的逻辑都在 LRUCache 类中。

鉴于自身水平有限,文章中有难免有些错误,欢迎大家指出。也希望大家可以积极留言,与笔者一起讨论编程的那些事。

转载地址:http://gumdl.baihongyu.com/

你可能感兴趣的文章
javaScript之function定义
查看>>
PowerShell常用的.Net 、COM对象(New-Object、Assembly)、加载程序集
查看>>
JAVASCRIPT+DHTML实现表格拖动
查看>>
C++ 内存对齐
查看>>
责任链模式
查看>>
Android 仿携程活动列表边框布局
查看>>
九大排序算法再总结
查看>>
Java泛型
查看>>
unity的sprite添加点击事件
查看>>
Pascal's Triangle II
查看>>
yocto系统介绍
查看>>
vim退出后终端保留 退出前的内容
查看>>
Android 实现ActionBar定制
查看>>
mysql之子查询
查看>>
VC++ 内存泄露与检测的一种方法
查看>>
iOS 9应用开发教程之定制应用程序图标以及真机测试
查看>>
JDK7新特性<八>异步io/AIO
查看>>
一步一步使用ABP框架搭建正式项目系列教程
查看>>
Ubuntu14.04下如何开启Mysql远程访问
查看>>
Java的循环结构
查看>>