2

mini-redis项目-2-存储层

 1 year ago
source link: https://jasonkayzk.github.io/2022/12/05/mini-redis%E9%A1%B9%E7%9B%AE-2-%E5%AD%98%E5%82%A8%E5%B1%82/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本篇接上一篇《mini-redis项目-1-简介》,讲解mini-redis存储层的实现;

系列文章:

mini-redis项目-2-存储层

在 mini-redis 中为了简单起见,数据是直接通过 HashMap 中存储了;

对于实际生产环境中使用的数据库来说,通常会使用:

  • 跳表:内存数据库数据结构实现,增加并发度;
  • LSM-Tree:日志数据库数据结构实现,便于通过Append落盘日志;

数据接口定义:KvStore Trait

数据操作接口被定义在 traits.rs 文件的 KvStore Trait 中;

src/storage/traits.rs

pub trait KvStore {
    fn get(&self, key: &str) -> Option<Bytes>;

    /// Set the value associated with a key along with an optional expiration
    /// Duration.
    ///
    /// If a value is already associated with the key, it is removed.
    fn set(&self, key: String, value: Bytes, expire: Option<Duration>);

    /// Returns a `Receiver` for the requested channel.
    ///
    /// The returned `Receiver` is used to receive values broadcast by `PUBLISH`
    /// commands.
    fn subscribe(&self, key: String) -> broadcast::Receiver<Bytes>;

    /// Publish a message to the channel. Returns the number of subscribers
    /// listening on the channel.
    fn publish(&self, key: &str, value: Bytes) -> usize;
}

主要是 get、set、pub、sub 四个接口,非常的简单;

其中 set 可以设置键的过期时间;

数据存储:Store

Store 结构体中存放实际的数据,定义如下:

src/storage/store.rs

#[derive(Debug)]
pub(crate) struct Store {
    pub(crate) entries: HashMap<String, Entry>,

    pub(crate) pub_sub: HashMap<String, broadcast::Sender<Bytes>>,

    pub(crate) expirations: BTreeMap<(Instant, u64), String>,

    pub(crate) next_id: u64,

    pub(crate) shutdown: bool,
}

#[derive(Debug)]
pub(crate) struct Entry {
    pub(crate) id: u64,

    pub(crate) data: Bytes,

    pub(crate) expires_at: Option<Instant>,
}

impl Store {
    pub(crate) fn new() -> Store {
        Store {
            entries: HashMap::new(),
            pub_sub: HashMap::new(),
            expirations: BTreeMap::new(),
            next_id: 0,
            shutdown: false,
        }
    }

    pub(crate) fn next_expiration(&self) -> Option<Instant> {
        self.expirations.keys().next().map(|expire| expire.0)
    }
}

Store 的逻辑也非常简单:

  • 首先,定义了 Entry 存放实际的 value 数据,因为存在过期键,所以声明了 id、expires_at;
  • 在 Store 中定义了:
    • entries:存放 k-v 数据;
    • pub_sub:存放 pub-sub 数据;
    • expirations:类似于优先队列,便于快速扫描过期键并移除;
    • next_id:为过期键分配的 id,避免找不到对应键;
    • shutdown:数据库是否已经关闭,如果数据库关闭,则此时不再接受请求,但需要释放连接等资源;
  • 最后为 Store 生成了 new 和 next_expiration 方法;

这里的 Store 层用了 pub(crate) 即整个 crate 可见,是一个简化操作;

个人感觉应当将 Store 进行封装,这样如果实际上底层的存储结构变化,对于上次的操作是无感知的!

数据操作:Db

Db结构体

Db 封装了上面的 Store,并实现了 KvStore Trait,对外提供了一组数据操作接口,下面来看具体实现;

Db实现:

src/storage/db.rs

#[derive(Debug, Clone)]
pub(crate) struct Db {
    shared: Arc<SharedDb>,
}

impl Db {
    pub(crate) fn new() -> Db {
        let shared = Arc::new(SharedDb::new());

        tokio::spawn(Db::purge_expired_tasks(shared.clone()));

        Db { shared }
    }

    async fn purge_expired_tasks(shared: Arc<SharedDb>) {
        while !shared.is_shutdown() {
            if let Some(when) = shared.purge_expired_keys() {
                tokio::select! {
                    _ = time::sleep_until(when) => {}
                    _ = shared.background_task.notified() => {}
                }
            } else {
                shared.background_task.notified().await;
            }
        }

        info!("Purge background task shut down")
    }

    fn shutdown_purge_task(&self) {
        let mut store = self.shared.store.lock().unwrap();
        store.shutdown = true;

        drop(store);
        self.shared.background_task.notify_one();
    }
}

由于可能会存在多个异步任务同时获取数据库的场景(例如:键过期、Channel订阅都会用到数据库);

因此需要将 Store 进行封装至 Arc 中,同时封装了 SharedDb(下文会讲);

Db 中定义了 new 方法,用于初始化数据库,并且启动 purge_expired_tasks 任务;

当服务器未关闭时:

  • 如果存在过期键:
    • purge_expired_tasks 会调用 shared.purge_expired_keys 清除最近的过期的键,并获取下一个最新过期的键的时间并等待;
    • 除非 shared.background_task 再次通知了当前异步任务,此时大概率是加入了更早的过期键,需要重新调用 shared.purge_expired_keys 获取最新的最近的过期时间;
  • 如果不存在过期键,则直接等待即可:shared.background_task.notified().await

这样,就完成了过期键的处理!

同时,Db 定义了 shutdown_purge_task 方法,提供给 DbDropGuard,用于在服务器停止后清理资源以及其他任务:

  • 这里 drop(store) 就显式释放了数据库资源,但是这个方法不能由在 Drop Db 的时候调用,因为还需要停止其他异步任务;
  • 同时,也会调用 self.shared.background_task.notify_one() 通知异步过期键任务,此时由于 shutdown 变为了 true,异步任务会直接退出!

SharedDb封装

前面提到了可能会存在多个异步任务同时获取数据库,因此将 Db 封装至了SharedDb 中:

src/storage/db.rs

#[derive(Debug)]
struct SharedDb {
    store: Mutex<Store>,

    background_task: Notify,
}

impl SharedDb {
    fn new() -> Self {
        SharedDb {
            store: Mutex::new(Store::new()),
            background_task: Notify::new(),
        }
    }

    fn purge_expired_keys(&self) -> Option<Instant> {
        let mut store = self.store.lock().unwrap();

        if store.shutdown {
            return None;
        }

        let store = &mut *store;

        let now = Instant::now();
        while let Some((&(when, id), key)) = store.expirations.iter().next() {
            if when > now {
                return Some(when);
            }

            store.entries.remove(key);
            store.expirations.remove(&(when, id));
        }

        None
    }

    fn is_shutdown(&self) -> bool {
        self.store.lock().unwrap().shutdown
    }
}

SharedDb 中的内容非常简单:

  • store: Mutex<Store>:互斥锁保护底层数据不会并发访问;
  • background_task: Notify:和前面提到的异步任务交互;

同时定义了几个方法,new、is_shutdown 很简单不说了;

和上文对应,purge_expired_keys 方法由 Db 中的异步任务调用,在服务器没有关闭时,通过 while let Some((&(when, id), key)) = store.expirations.iter().next() 按照过期时间获取过期键:

  • 如果过期了,则从 entries、expirations 中将键删除;
  • 如果没过期,则当前为下一个过期的键,返回;
  • 如果不存在过期键,返回 None;

Db实现 KvStore Trait

下面是 Db 实现的 KvStore Trait:

src/storage/db.rs

impl KvStore for Db {

    fn get(&self, key: &str) -> Option<Bytes> {
        let store = self.shared.store.lock().unwrap();
        store.entries.get(key).map(|entry| entry.data.clone())
    }

    fn set(&self, key: String, value: Bytes, expire: Option<Duration>) {
        let mut store = self.shared.store.lock().unwrap();

        let id = store.next_id;
        store.next_id += 1;

        let mut notify = false;

        let expires_at = expire.map(|duration| {
            let when = Instant::now() + duration;

            notify = store
                .next_expiration()
                .map(|expiration| expiration > when)
                .unwrap_or(true);

            store.expirations.insert((when, id), key.clone());
            when
        });

        let prev = store.entries.insert(
            key,
            Entry {
                id,
                data: value,
                expires_at,
            },
        );

        if let Some(prev) = prev {
            if let Some(when) = prev.expires_at {
                // clear expiration
                store.expirations.remove(&(when, prev.id));
            }
        }

        drop(store);

        if notify {
            self.shared.background_task.notify_one();
        }
    }

    fn subscribe(&self, key: String) -> broadcast::Receiver<Bytes> {
        use std::collections::hash_map::Entry;

        let mut store = self.shared.store.lock().unwrap();

        match store.pub_sub.entry(key) {
            Entry::Occupied(e) => e.get().subscribe(),
            Entry::Vacant(e) => {
                let (tx, rx) = broadcast::channel(1024);
                e.insert(tx);
                rx
            }
        }
    }

    fn publish(&self, key: &str, value: Bytes) -> usize {
        debug!("publish: (key={}, len(value)={})", key, value.len());

        let state = self.shared.store.lock().unwrap();

        state
            .pub_sub
            .get(key)
            .map(|tx| tx.send(value).unwrap_or(0))
            .unwrap_or(0)
    }
}

下面分别来看;

Get方法

Get 方法比较简单:

fn get(&self, key: &str) -> Option<Bytes> {
  // Acquire the lock, get the entry and clone the value.
  //
  // Because data is stored using `Bytes`, a clone here is a shallow
  // clone. Data is not copied.
  let store = self.shared.store.lock().unwrap();
  store.entries.get(key).map(|entry| entry.data.clone())
}

首先获取存储的锁,然后通过 store 中的 HashMap entries 直接获取键值即可;

这里注意:我们使用的是 bytes::Bytes,因此这里 clone 的实际上是一个胖指针,而非将整个数据copy了一份!

Set方法

Set方法如下:

fn set(&self, key: String, value: Bytes, expire: Option<Duration>) {
  let mut store = self.shared.store.lock().unwrap();

  let id = store.next_id;
  store.next_id += 1;

  let mut notify = false;

  let expires_at = expire.map(|duration| {
    let when = Instant::now() + duration;

    notify = store
    .next_expiration()
    .map(|expiration| expiration > when)
    .unwrap_or(true);

    store.expirations.insert((when, id), key.clone());
    when
  });

  let prev = store.entries.insert(
    key,
    Entry {
      id,
      data: value,
      expires_at,
    },
  );

  if let Some(prev) = prev {
    if let Some(when) = prev.expires_at {
      store.expirations.remove(&(when, prev.id));
    }
  }

  drop(store);

  if notify {
    self.shared.background_task.notify_one();
  }
}

我们为当前 set 的 key:

  • 分配了 next_id;
  • 并定义了 notify:当 set 的 key 的过期时间小于当前过期时间最短的 key 的过期时间时,会发送过期异步任务通知,唤醒前面 Db 中的 purge_expired_keys 方法,重新刷新过期任务!

随后,将键加入 store.entries 中,如果是过期键还会加入 store.expirations 中;

然后,需要判断如果 set 的 key 之前在数据库中存在了,则需要将前一个过期任务删除;

最后,显式 drop 掉我们获取的 store 的锁,这样最后通知刷新过期键异步任务后,这个异步任务不用再等待这个函数释放锁了~

Subscribe方法

subscribe方法如下:

fn subscribe(&self, key: String) -> broadcast::Receiver<Bytes> {
  use std::collections::hash_map::Entry;

  let mut store = self.shared.store.lock().unwrap();

  match store.pub_sub.entry(key) {
    Entry::Occupied(e) => e.get().subscribe(),
    Entry::Vacant(e) => {
      let (tx, rx) = broadcast::channel(1024);
      e.insert(tx);
      rx
    }
  }
}

subscribe 方法的逻辑也非常简单:

  • 如果当前在 store.pub_sub 中的 key 对应的 channel 已经被 subscribe 了,则直接返回一个新的 Receiver 即可;
  • 如果还没有任何一个 client subscribe 则创建一个新的channel,并将 Receiver 加入 store.pub_sub 存储后返回Receiver即可;

Publish方法

有了 Receiver,则 publish 方法的实现更加简单:

fn publish(&self, key: &str, value: Bytes) -> usize {
  let state = self.shared.store.lock().unwrap();

  state
  .pub_sub
  .get(key)
  .map(|tx| tx.send(value).unwrap_or(0))
  .unwrap_or(0)
}

直接获取 state.pub_sub 中 key 对应的 channel 并调用 send 方法即可;

需要注意的是:如果不存在这个 key,或者发生失败,则返回 0 大小的消息,表示未发送成功;

数据回收:DbDropGuard

最后就是 DbDropGuard 的实现了,DbDropGuard 实际上是 Db 的一个封装;

当通过 DbDropGuard 获取 Db 时,实际上是一个 Db 的 clone,而 Db 内部是一个 Arc 计数智能指针;

因此,当所有使用 Db 都被回收,仍然需要 DbDropGuard 被 drop 后才会对数据库进行回收(DbDropGuard 中持有第一个 Db):

src/storage/db.rs

/// A wrapper around a `Db` instance. This exists to allow orderly cleanup
/// of the `Db` by signalling the background purge task to shut down when
/// this struct is dropped.
#[derive(Debug)]
pub(crate) struct DbDropGuard {
    /// The `Db` instance that will be shut down when this `DbHolder` struct
    /// is dropped.
    db: Db,
}

impl DbDropGuard {
    /// Create a new `DbHolder`, wrapping a `Db` instance. When this is dropped
    /// the `Db`'s purge task will be shut down.
    pub(crate) fn new() -> DbDropGuard {
        DbDropGuard { db: Db::new() }
    }

    /// Get the shared database. Internally, this is an `Arc`,
    /// so a clone only increments the ref count.
    pub(crate) fn db(&self) -> Db {
        self.db.clone()
    }
}

impl Drop for DbDropGuard {
    fn drop(&mut self) {
        // Signal the 'Db' instance to shut down the task that purges expired keys
        self.db.shutdown_purge_task();
    }
}

同时,为 DbDropGuard 实现了 Drop Trait,当 DbDropGuard 被回收后,会调用前文中提到的 db 中的 shutdown_purge_task 方法进行资源回收;

小结

本小节完成了对 mini-redis 存储层的实现,对外封装了 DbDropGuard;

在使用时只需创建 DbDropGuard 对象,然后通过其内部封装的智能指针 Db 操作底层的 Store 即可!

实际上,真实的存储结构远远要比这里的 HashMap 要复杂,但是本文也能起到一定抛砖引玉的作用!

附录

系列文章:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK