3

mini-redis项目-4-服务端

 1 year ago
source link: https://jasonkayzk.github.io/2022/12/06/mini-redis%E9%A1%B9%E7%9B%AE-4-%E6%9C%8D%E5%8A%A1%E7%AB%AF/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

前面几篇文章讲解了mini-redis的存储层、连接层,这一篇在此基础之上继续讲解服务端的实现;

系列文章:

mini-redis项目-4-服务端

服务端入口

可执行文件入口

服务端的入口在 src/bin/server.rs 可执行文件中;

具体实现如下:

src/bin/server.rs

#[derive(Parser, Debug)]
#[clap(
    name = "mini-redis-server",
    version,
    author,
    about = "A mini redis server"
)]
struct Cli {
    #[clap(long)]
    port: Option<u16>,
}

#[tokio::main]
pub async fn main() -> Result<(), MiniRedisServerError> {
    let cli = init();
    let port = cli.port.unwrap_or(DEFAULT_PORT);

    // Bind a TCP listener
    let listener = TcpListener::bind(&format!("0.0.0.0:{}", port)).await?;

    server::run(listener, signal::ctrl_c()).await;

    Ok(())
}

fn init() -> Cli {
    dotenv().ok();
    logger::init();
    Cli::parse()
}

主要使用了 clap 库对定义了命令行参数 port

首先,main 函数调用 init 函数进行初始化,在 init 函数中:

  • 使用 dotenv 通过 .env 文件初始化环境变量(如果有的话);
  • 随后通过 logger::init 初始化 logger;
  • 最后调用 Cli::parse() 解析命令行参数到 Cli 结构体中;

随后,创建 TCP listener 等待接收客户端连接,并将 listener 传入 server 模块提供的 run 方法中,启动服务端;

因此,这个 run 方法就是整个服务端的入口;

Server结构

Server 的结构如下所示:

$ tree ./src/server    
./src/server
├── handler.rs
├── listener.rs
├── mod.rs
└── shutdown.rs

各个部分内容如下:

  • mod:对外暴露了 run 函数,启动服务端,初始化相应的资源;
  • listener:监听具体的 TCP 连接,并创建 Handler 处理消息;
  • handler:由 listener 内部创建,对接收到的每一个 TCP 连接创建一个 Handler 来处理对应的客户端;
  • shutdown:服务器停止后用来等待异步任务结束后优雅关闭;

服务端启动异步函数:run

下面具体来看上面提到的在 mod.rs 中定义的 run 函数;

具体实现如下:

src/server/mod.rs

/// Run the mini-redis server.
///
/// Accepts connections from the supplied listener. For each inbound connection,
/// a task is spawned to handle that connection. The server runs until the
/// `shutdown` future completes, at which point the server shuts down
/// gracefully.
///
/// `tokio::signal::ctrl_c()` can be used as the `shutdown` argument. This will
/// listen for a SIGINT signal.
pub async fn run(listener: TcpListener, shutdown: impl Future) {
    info!(
        "mini-redis server started listen on: {}",
        listener.local_addr().unwrap()
    );

    // When the provided `shutdown` future completes, we must send a shutdown
    // message to all active connections. We use a broadcast channel for this
    // purpose. The call below ignores the receiver of the broadcast pair, and when
    // a receiver is needed, the subscribe() method on the sender is used to create one.
    let (notify_shutdown, _) = broadcast::channel(1);
    let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);

    // Initialize the listener state
    let mut server = Listener {
        listener,
        db_holder: DbDropGuard::new(),
        limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
        notify_shutdown,
        shutdown_complete_tx,
        shutdown_complete_rx,
    };

    // Concurrently run the server and listen for the `shutdown` signal. The
    // server task runs until an error is encountered, so under normal
    // circumstances, this `select!` statement runs until the `shutdown` signal
    // is received.
    tokio::select! {
        res = server.run() => {
            // If an error is received here, accepting connections from the TCP
            // listener failed multiple times and the server is giving up.
            //
            // Errors encountered when handling individual connections do not
            // bubble up to this point.
            if let Err(err) = res {
                error!("failed to accept: {:?}", err);
            }
        }
        _ = shutdown => {
            // The shutdown signal has been received
            debug!("server is about to shutdown");
        }
    }

    // Extract the `shutdown_complete` receiver and transmitter
    // explicitly drop `shutdown_transmitter`. This is important, as the
    // `.await` below would otherwise never complete.
    let Listener {
        mut shutdown_complete_rx,
        shutdown_complete_tx,
        notify_shutdown,
        ..
    } = server;

    // When `notify_shutdown` is dropped, all tasks which have `subscribe`d will
    // receive the shutdown signal and can exit
    drop(notify_shutdown);
    // Drop final `Sender` so the `Receiver` below can complete
    drop(shutdown_complete_tx);

    debug!("server is shutting down");

    // Wait for all active connections to finish processing. As the `Sender`
    // handle held by the listener has been dropped above, the only remaining
    // `Sender` instances are held by connection handler tasks. When those drop,
    // the `mpsc` channel will close and `recv()` will return `None`.
    let _ = shutdown_complete_rx.recv().await;
}

在 run 函数的入参中需要传入:

  • listener:具体的 TCP Listener;
  • shutdown:一个 Future 来表示一个未来会执行的异步任务,来停止数据库的执行;

shutdown 在本例中为 signal::ctrl_c(),生产环境可以是其他 Future,如:健康检查等,帮助优雅关闭并重启服务:

  • 当 shutdown Future 完成后,我们应当发送服务器即将关闭的消息给所有的 TCP 连接,并等待所有的异步任务处理完成后退出:因此,run 函数创建了 broadcast 类型的 channel notify_shutdown,他会被复制到所有 TCP 连接中,当关闭时通知所有 TCP 连接;
  • 当所有的异步任务结束、资源被释放后我们才可以退出,所以 run 函数还创建了 shutdown_complete_tx, shutdown_complete_rx,来通知、等待 shutdown 任务完成;

随后初始化了我们的服务端 Listener,以及对应的数据库、连接等资源;

然后使用 select! 宏等待:

  • TCP 连接:处理客户端请求;
  • shutdown Future:关闭数据库;

一旦我们跳出了 select! 宏,说明 shutdown Future 宏被执行;

因此我们显式的 drop 掉 notify_shutdown、shutdown_complete_tx,这样其他 TCP 连接以及数据库就会收到 shutdown 的信号,进而转入 shutdown 阶段;

最后,我们只需要调用 shutdown_complete_rx.recv() 等待所有的 Sender 都 drop 掉后,这里自然而然就会返回 None,从而整个服务端优雅的退出!

服务监听Listener

由于在前几篇文章中我们已经将底层做了很好的封装,因此 Listener 的逻辑并不复杂;

Listener 就是我们的 Server,他负责:

  • 接收并处理 TCP 连接请求;
  • 封装 DbDropGuard 存储;
  • 优雅停机;

具体 Listener 的定义如下:

src/server/listener.rs

/// Server listener state. Created in the `run` call. It includes a `run` method
/// which performs the TCP listening and initialization of per-connection state.
#[derive(Debug)]
pub(crate) struct Listener {
    /// Shared database handle.
    ///
    /// Contains the key / value store as well as the broadcast channels for
    /// pub/sub.
    ///
    /// This holds a wrapper around an `Arc`. The internal `Db` can be
    /// retrieved and passed into the per connection state (`Handler`).
    pub(crate) db_holder: DbDropGuard,

    /// TCP listener supplied by the `run` caller.
    pub(crate) listener: TcpListener,

    /// Limit the max number of connections.
    ///
    /// A `Semaphore` is used to limit the max number of connections. Before
    /// attempting to accept a new connection, a permit is acquired from the
    /// semaphore. If none are available, the listener waits for one.
    ///
    /// When handlers complete processing a connection, the permit is returned
    /// to the semaphore.
    pub(crate) limit_connections: Arc<Semaphore>,

    /// Broadcasts a shutdown signal to all active connections.
    ///
    /// The initial `shutdown` trigger is provided by the `run` caller. The
    /// server is responsible for gracefully shutting down active connections.
    /// When a connection task is spawned, it is passed a broadcast receiver
    /// handle. When a graceful shutdown is initiated, a `()` value is sent via
    /// the broadcast::Sender. Each active connection receives it, reaches a
    /// safe terminal state, and completes the task.
    pub(crate) notify_shutdown: broadcast::Sender<()>,

    /// Used as part of the graceful shutdown process to wait for client
    /// connections to complete processing.
    ///
    /// Tokio channels are closed once all `Sender` handles go out of scope.
    /// When a channel is closed, the receiver receives `None`. This is
    /// leveraged to detect all connection handlers completing. When a
    /// connection handler is initialized, it is assigned a clone of
    /// `shutdown_complete_tx`. When the listener shuts down, it drops the
    /// sender held by this `shutdown_complete_tx` field. Once all handler tasks
    /// complete, all clones of the `Sender` are also dropped. This results in
    /// `shutdown_complete_rx.recv()` completing with `None`. At this point, it
    /// is safe to exit the server process.
    pub(crate) shutdown_complete_rx: mpsc::Receiver<()>,
    pub(crate) shutdown_complete_tx: mpsc::Sender<()>,
}

各个字段含义如下:

  • db_holder: DbDropGuard:内部存储数据库;
  • listener: TcpListener:监听 TCP 连接;
  • limit_connections: Arc<Semaphore>:使用信号量 Semaphore 实现的连接令牌,当超过了最大连接数,则需要等待其他连接释放后才能创建新的连接;
  • notify_shutdown: broadcast::Sender<()>:通知所有 TCP 服务器 shutdown 信号;
  • shutdown_complete_rx: mpsc::Receiver<()>, shutdown_complete_tx: mpsc::Sender:shutdown 任务结束后信号通知;

Listener 中实现了 run 方法,用来在上面的 select! 宏中接收并处理 TCP 连接;

具体实现如下:

src/server/listener.rs

impl Listener {
    /// Run the server
    ///
    /// Listen for inbound connections. For each inbound connection, spawn a
    /// task to process that connection.
    ///
    /// # Errors
    ///
    /// Returns `Err` if accepting returns an error. This can happen for a
    /// number reasons that resolve over time. For example, if the underlying
    /// operating system has reached an internal limit for max number of
    /// sockets, accept will fail.
    ///
    /// The process is not able to detect when a transient error resolves
    /// itself. One strategy for handling this is to implement a back off
    /// strategy, which is what we do here.
    pub(crate) async fn run(&mut self) -> Result<(), MiniRedisConnectionError> {

        loop {
            // Wait for a permit to become available
            //
            // `acquire_owned` returns a permit that is bound to the semaphore.
            // When the permit value is dropped, it is automatically returned
            // to the semaphore.
            //
            // `acquire_owned()` returns `Err` when the semaphore has been
            // closed. We don't ever close the semaphore, so `unwrap()` is safe.
            let permit = self
                .limit_connections
                .clone()
                .acquire_owned()
                .await
                .unwrap();

            // Accept a new socket. This will attempt to perform error handling.
            // The `accept` method internally attempts to recover errors, so an
            // error here is non-recoverable.
            let socket = self.accept().await?;

            // Create the necessary per-connection handler state.
            let mut handler = Handler {
                // Get a handle to the shared database.
                db: self.db_holder.db(),

                // Initialize the connection state. This allocates read/write
                // buffers to perform redis protocol frame parsing.
                connection: Connection::new(socket),

                // Receive shutdown notifications.
                shutdown: Shutdown::new(self.notify_shutdown.subscribe()),

                // Notifies the receiver half once all clones are dropped.
                _shutdown_complete: self.shutdown_complete_tx.clone(),
            };

            // Spawn a new task to process the connections. Tokio tasks are like
            // asynchronous green threads and are executed concurrently.
            tokio::spawn(async move {
                // Process the connection. If an error is encountered, log it.
                if let Err(err) = handler.run().await {
                    error!("connection error:{:?}", err);
                }
                // Move the permit into the task and drop it after completion.
                // This returns the permit back to the semaphore.
                drop(permit);
            });
        }
    }

    /// Accept an inbound connection.
    ///
    /// Errors are handled by backing off and retrying. An exponential backoff
    /// strategy is used. After the first failure, the task waits for 1 second.
    /// After the second failure, the task waits for 2 seconds. Each subsequent
    /// failure doubles the wait time. If accepting fails on the 6th try after
    /// waiting for 64 seconds, then this function returns with an error.
    async fn accept(&mut self) -> Result<TcpStream, MiniRedisConnectionError> {
        let mut backoff = 1;

        // Try to accept a few times
        loop {
            // Perform the accept operation. If a socket is successfully
            // accepted, return it. Otherwise, save the error.
            match self.listener.accept().await {
                Ok((socket, _)) => return Ok(socket),
                Err(err) => {
                    if backoff > 64 {
                        // Accept has failed too many times. Return the error.
                        error!("failed to accept socket after retry: {}", err);
                        return Err(err.into());
                    } else {
                        error!("failed to accept socket: {}", err);
                    }
                }
            }

            // Pause execution until the back off period elapses.
            time::sleep(Duration::from_secs(backoff)).await;

            // Double the back off
            backoff <<= 2;
        }
    }
}

在 run 方法中:

首先会通过 limit_connections 获取一个令牌;

随后,调用 accept 内部定义的异步方法等待 TCP 连接,accept 会尝试几次建立连接,如果都失败才会返回错误;

然后,在 run 方法中创建了 Handler,传入 Socket 连接,并将 dbnotify_shutdownshutdown_complete_tx 复制进去,用以处理客户请求以及 shutdown 信号;

最后,通过 tokio::spawn 创建新的异步任务;

需要注意的是:我们需要在异步任务结束后,调用 drop 来返还我们通过 limit_connections 获取到的令牌;

请求处理Handler

Handler 是整个服务端处理客户端请求的核心,每个 TCP 连接都会创建一个对应的 Handler 异步任务;

Handler 结构体定义如下:

/// Per-connection handler. Reads requests from `connection` and applies the
/// commands to `db`.
#[derive(Debug)]
pub(crate) struct Handler {
    /// Shared database handle.
    ///
    /// When a command is received from `connection`, it is applied with `db`.
    /// The implementation of the command is in the `cmd` module. Each command
    /// will need to interact with `db` in order to complete the work.
    pub(crate) db: Db,

    /// The TCP connection decorated with the redis protocol encoder / decoder
    /// implemented using a buffered `TcpStream`.
    ///
    /// When `Listener` receives an inbound connection, the `TcpStream` is
    /// passed to `Connection::new`, which initializes the associated buffers.
    /// `Connection` allows the handler to operate at the "frame" level and keep
    /// the byte level protocol parsing details encapsulated in `Connection`.
    pub(crate) connection: Connection,

    /// Listen for shutdown notifications.
    ///
    /// A wrapper around the `broadcast::Receiver` paired with the sender in
    /// `Listener`. The connection handler processes requests from the
    /// connection until the peer disconnects **or** a shutdown notification is
    /// received from `shutdown`. In the latter case, any in-flight work being
    /// processed for the peer is continued until it reaches a safe state, at
    /// which point the connection is terminated.
    pub(crate) shutdown: Shutdown,

    /// Not used directly. Instead, when `Handler` is dropped
    pub(crate) _shutdown_complete: mpsc::Sender<()>,
}
  • db:数据库的计数引用;
  • connection:客户端的 TCP 连接;
  • shutdown:服务器 shutdown 后的信号通知;
  • _shutdown_complete:Handler 处理完成后 Drop 此引用,通知 Listener 中的 channel;

在 Handler 中也实现了 run 方法下面重点来看:

impl Handler {
    /// Process a single connection.
    ///
    /// Request frames are read from the socket and processed. Responses are
    /// written back to the socket.
    ///
    /// Currently, pipelining is not implemented. Pipelining is the ability to
    /// process more than one request concurrently per connection without
    /// interleaving frames. 
    ///
    /// See for more details:
    /// https://redis.io/topics/pipelining
    ///
    /// When the shutdown signal is received, the connection is processed until
    /// it reaches a safe state, at which point it is terminated.
    pub(crate) async fn run(&mut self) -> Result<(), MiniRedisConnectionError> {
        // As long as the shutdown signal has not been received, try to read a
        // new request frame.
        while !self.shutdown.is_shutdown() {
            // While reading a request frame, also listen for the shutdown
            // signal.
            let maybe_frame = tokio::select! {
                res = self.connection.read_frame() => res?,
                _ = self.shutdown.recv() => {
                    // If a shutdown signal is received, return from `run`.
                    // This will result in the task terminating.
                    return Ok(());
                }
            };

            // If `None` is returned from `read_frame()` then the peer closed
            // the socket. There is no further work to do and the task can be
            // terminated.
            let frame = match maybe_frame {
                Some(frame) => frame,
                None => {
                    debug!("peer closed the socket, return");
                    return Ok(());
                }
            };

            // Convert the redis frame into a command struct. This returns an
            // error if the frame is not a valid redis command or it is an
            // unsupported command.
            let cmd = Command::from_frame(frame)?;

            // Logs the `cmd` object.
            debug!("received command: {:?}", cmd);

            // Perform the work needed to apply the command. This may mutate the
            // database state as a result.
            //
            // The connection is passed into the apply function which allows the
            // command to write response frames directly to the connection. In
            // the case of pub/sub, multiple frames may be send back to the
            // peer.
            cmd.apply(&self.db, &mut self.connection, &mut self.shutdown)
                .await?;
        }

        Ok(())
    }
}

只要在 run 方法中没有收到 shutdown 信号或没有报错,就调用 self.connection.read_frame() 持续接收并解析来自客户端的数据;

直到接收到了完整的 Frame 数据,判断 Frame 是否为空,如果为空说明客户端断开了连接,此时可以直接返回;

否则调用 Command::from_frame(frame) 将 Frame 转为对应的命令;

最后调用 cmd.apply(&self.db, &mut self.connection, &mut self.shutdown).await?; 执行对应的命令;

上面的 Command::from_frame(frame)cmd.apply() 都定义在 cmd 执行命令模块,下面来看;

执行命令模块

cmd 模块中定义了 Command 枚举,为 mini-redis 中每一个命令都定义了相应的枚举类型;

定义如下:

src/cmd/mod.rs

/// Enumeration of supported Redis commands.
///
/// Methods called on `Command` are delegated to the command implementation.
#[derive(Debug)]
pub enum Command {
    Get(Get),
    Set(Set),
    Publish(Publish),
    Subscribe(Subscribe),
    Unsubscribe(Unsubscribe),
    Ping(Ping),
    Unknown(Unknown),
}

其中每个枚举类型都包含一个对应的类型实现:

$ tree ./src/cmd       
./src/cmd
├── get.rs
├── mod.rs
├── ping.rs
├── publish.rs
├── set.rs
├── subscribe.rs
├── unknown.rs
└── unsubscribe.rs

同时为 Command 实现了两个方法:

  • from_frame:将命令 Frame 转为具体的命令枚举;
  • apply:执行具体命令;

具体实现如下:

impl Command {
    /// Parse a command from a received frame.
    ///
    /// The `Frame` must represent a Redis command supported by `mini-redis` and
    /// be the array variant.
    pub fn from_frame(frame: Frame) -> Result<Command, MiniRedisParseError> {
        let mut parse = Parse::new(frame)?;

        // All redis commands begin with the command name as a string. The name
        // is read and converted to lower cases in order to do case sensitive
        // matching.
        let command_name = parse.next_string()?.to_lowercase();

        // Match the command name, delegating the rest of the parsing to the
        // specific command.
        let command = match &command_name[..] {
            "get" => Command::Get(Get::parse_frames(&mut parse)?),
            "set" => Command::Set(Set::parse_frames(&mut parse)?),
            "publish" => Command::Publish(Publish::parse_frames(&mut parse)?),
            "subscribe" => Command::Subscribe(Subscribe::parse_frames(&mut parse)?),
            "unsubscribe" => Command::Unsubscribe(Unsubscribe::parse_frames(&mut parse)?),
            "ping" => Command::Ping(Ping::parse_frames(&mut parse)?),
            _ => {
                // The command is not recognized and an Unknown command is
                // returned.
                //
                // `return` is called here to skip the `finish()` call below. As
                // the command is not recognized, there is most likely
                // unconsumed fields remaining in the `Parse` instance.
                return Ok(Command::Unknown(Unknown::new(command_name)));
            }
        };

        // Check if there is any remaining unconsumed fields in the `Parse`
        // value. If fields remain, this indicates an unexpected frame format
        // and an error is returned.
        parse.finish()?;

        // The command has been successfully parsed
        Ok(command)
    }

    /// Apply the command to the specified `Db` instance.
    ///
    /// The response is written to `dst`. This is called by the server in order
    /// to execute a received command.
    /// Apply the command to the specified `Db` instance.
    ///
    /// The response is written to `dst`. This is called by the server in order
    /// to execute a received command.
    pub(crate) async fn apply(
        self,
        db: &Db,
        dst: &mut Connection,
        shutdown: &mut Shutdown,
    ) -> Result<(), MiniRedisConnectionError> {
        use Command::*;

        match self {
            Ping(cmd) => cmd.apply(dst).await,
            Get(cmd) => cmd.apply(db, dst).await,
            Set(cmd) => cmd.apply(db, dst).await,
            Publish(cmd) => cmd.apply(db, dst).await,
            Subscribe(cmd) => cmd.apply(db, dst, shutdown).await,
            // `Unsubscribe` cannot be applied. It may only be received from the
            // context of a `Subscribe` command.
            Unsubscribe(_) => Err(MiniRedisConnectionError::CommandExecute(
                "`Unsubscribe` is unsupported in this context".into(),
            )),
            Unknown(cmd) => cmd.apply(dst).await,
        }
    }

    /// Returns the command name
    pub(crate) fn get_name(&self) -> &str {
        match self {
            Command::Get(_) => "get",
            Command::Set(_) => "set",
            Command::Publish(_) => "pub",
            Command::Subscribe(_) => "subscribe",
            Command::Unsubscribe(_) => "unsubscribe",
            Command::Ping(_) => "ping",
            Command::Unknown(cmd) => cmd.get_name(),
        }
    }
}

from_frame 方法的实现非常简单:

  • 首先,通过上一篇文章中实现的 Parse 对 Frame 进行解析,转为类似于 Cursor 的操作;
  • 随后,调用 parse.next_string()?.to_lowercase() 获取第一个 string,也就是命令名称并转为小写;
  • 然后,根据具体的命令,调用对应命令的 parse_frames 方法,将 Parse 转为具体的命令枚举;
  • 最后,调用 parse.finish() 校验命令格式,如果校验成功,则返回上面解析完成的命令枚举;

apply 方法更加简单,就是通过匹配不同类型的枚举,调用具体命令实现的 apply 方法;

每一个具体命令的实现如下所示;

Ping命令

Ping 命令实现如下:

src/cmd/ping.rs

#[derive(Debug, Default)]
pub struct Ping {
    /// optional message to be returned
    msg: Option<String>,
}

impl Ping {
    pub fn new(msg: Option<String>) -> Ping {
        Ping { msg }
    }

    /// Parse a `Ping` instance from a received frame.
    ///
    /// The `Parse` argument provides a cursor-like API to read fields from the
    /// `Frame`. At this point, the entire frame has already been received from
    /// the socket.
    ///
    /// The `PING` string has already been consumed.
    pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Ping, MiniRedisParseError> {
        match parse.next_string() {
            Ok(msg) => Ok(Ping::new(Some(msg))),
            Err(MiniRedisParseError::EndOfStream) => Ok(Ping::default()),
            Err(e) => Err(e),
        }
    }

    /// Apply the `Ping` command and return the message.
    ///
    /// The response is written to `dst`. This is called by the server in order
    /// to execute a received command.
    pub(crate) async fn apply(self, dst: &mut Connection) -> Result<(), MiniRedisConnectionError> {
        let response = match self.msg {
            None => Frame::Simple("PONG".to_string()),
            Some(msg) => Frame::Bulk(Bytes::from(msg)),
        };

        // Write the response back to the client
        dst.write_frame(&response).await?;

        Ok(())
    }

    /// Converts the command into an equivalent `Frame`.
    ///
    /// This is called by the client when encoding a `Ping` command to send
    /// to the server.
    pub(crate) fn into_frame(self) -> Result<Frame, MiniRedisParseError> {
        let mut frame = Frame::array();
        frame.push_bulk(Bytes::from("ping".as_bytes()))?;
        if let Some(msg) = self.msg {
            frame.push_bulk(Bytes::from(msg))?;
        }
        Ok(frame)
    }
}

parse_frames 和 apply 的实现都非常简单,这里不再赘述;

需要注意的是:Parse 内部是一个 IntoIter 实现,并且前面在匹配命令时,已经将具体的命令字符串消费了,因此这里的 Parse 的迭代器是不包含最开始的命令字符串的!其他命令也是类似!

同时,为命令实现了 into_frame 方法,这是提供给客户端使用的,用于将客户端通过命令行输入的命令转化为对应的 Frame 发送给服务端执行;

into_frame 方法实现非常简单这里均不再赘述;

Get命令

Get命令的实现也很简单;

src/cmd/get.rs

#[derive(Debug)]
pub struct Get {
    key: String,
}

impl Get {
    pub fn new(key: impl ToString) -> Get {
        Get {
            key: key.to_string(),
        }
    }

    pub fn key(&self) -> &str {
        &self.key
    }

    pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Get, MiniRedisParseError> {
        let key = parse.next_string()?;

        Ok(Get { key })
    }

    pub(crate) async fn apply(
        self,
        db: &Db,
        dst: &mut Connection,
    ) -> Result<(), MiniRedisConnectionError> {
        let response = if let Some(value) = db.get(&self.key) {
            Frame::Bulk(value)
        } else {
            // If there is no value, `Null` is written.
            Frame::Null
        };

        debug!("get command applied resp: {:?}", response);

        dst.write_frame(&response).await?;

        Ok(())
    }

    pub(crate) fn into_frame(self) -> Result<Frame, MiniRedisParseError> {
        let mut frame = Frame::array();
        frame.push_bulk(Bytes::from("get".as_bytes()))?;
        frame.push_bulk(Bytes::from(self.key.into_bytes()))?;
        Ok(frame)
    }
}

Set命令

由于 Set 命令存在设置过期时间的用法,因此稍微有些复杂;

具体实现如下:

src/cmd/set.rs

/// Set `key` to hold the string `value`.
///
/// If `key` already holds a value, it is overwritten, regardless of its type.
/// Any previous time to live associated with the key is discarded on successful
/// SET operation.
#[derive(Debug)]
pub struct Set {
    /// the lookup key
    key: String,

    /// the value to be stored
    value: Bytes,

    /// When to expire the key
    expire: Option<Duration>,
}

impl Set {
    pub fn new(key: impl ToString, value: Bytes, expire: Option<Duration>) -> Set {
        Set {
            key: key.to_string(),
            value,
            expire,
        }
    }

    /// Parse a `Set` instance from a received frame.
    ///
    /// The `Parse` argument provides a cursor-like API to read fields from the
    /// `Frame`. At this point, the entire frame has already been received from
    /// the socket.
    pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Set, MiniRedisParseError> {
        // Read the key to set. This is a required field
        let key = parse.next_string()?;

        // Read the value to set. This is a required field.
        let value = parse.next_bytes()?;

        // The expiration is optional. If nothing else follows, then it is `None`.
        let mut expire = None;

        // Attempt to parse another string.
        match parse.next_string() {
            Ok(s) if s.to_uppercase() == "EX" => {
                // An expiration is specified in seconds. The next value is an
                // integer.
                let secs = parse.next_int()?;
                expire = Some(Duration::from_secs(secs));
            }
            Ok(s) if s.to_uppercase() == "PX" => {
                // An expiration is specified in milliseconds. The next value is
                // an integer.
                let ms = parse.next_int()?;
                expire = Some(Duration::from_millis(ms));
            }
            // Currently, mini-redis does not support any of the other SET
            // options. An error here results in the connection being
            // terminated. Other connections will continue to operate normally.
            Ok(s) => {
                warn!("unsupported SET option: {}", s);
                return Err(MiniRedisParseError::Parse(
                    "currently `SET` only supports the expiration option".into(),
                ));
            }
            // The `EndOfStream` error indicates there is no further data to
            // parse. In this case, it is a normal run time situation and
            // indicates there are no specified `SET` options.
            Err(MiniRedisParseError::EndOfStream) => {
                debug!("no extra SET option");
            }
            // All other errors are bubbled up, resulting in the connection
            // being terminated.
            Err(err) => return Err(err),
        }

        Ok(Set { key, value, expire })
    }

    /// Apply the `Set` command to the specified `Db` instance.
    ///
    /// The response is written to `dst`. This is called by the server in order
    /// to execute a received command.
    pub(crate) async fn apply(
        self,
        db: &Db,
        dst: &mut Connection,
    ) -> Result<(), MiniRedisConnectionError> {
        // Set the value in the shared database state.
        db.set(self.key, self.value, self.expire);

        // Create a success response and write it to `dst`.
        let response = Frame::Simple("OK".to_string());
        debug!("applied set command response: {:?}", response);

        dst.write_frame(&response).await?;

        Ok(())
    }

    /// Converts the command into an equivalent `Frame`.
    ///
    /// This is called by the client when encoding a `Set` command to send to
    /// the server.
    pub(crate) fn into_frame(self) -> Result<Frame, MiniRedisParseError> {
        let mut frame = Frame::array();
        frame.push_bulk(Bytes::from("set".as_bytes()))?;
        frame.push_bulk(Bytes::from(self.key.into_bytes()))?;
        frame.push_bulk(self.value)?;
        if let Some(ms) = self.expire {
            // Expirations in Redis procotol can be specified in two ways
            // 1. SET key value EX seconds
            // 2. SET key value PX milliseconds
            // We implement the second option because it allows greater precision and
            // src/bin/cli.rs parses the expiration argument as milliseconds
            // in duration_from_ms_str()
            frame.push_bulk(Bytes::from("px".as_bytes()))?;
            frame.push_int(ms.as_millis() as u64)?;
        }
        Ok(frame)
    }

    pub fn key(&self) -> &str {
        &self.key
    }

    pub fn value(&self) -> &Bytes {
        &self.value
    }

    pub fn expire(&self) -> Option<Duration> {
        self.expire
    }
}

parse_frames 方法首先通过 parse 获取到了要 set 的 key 和 value,随后获取下一个 String:

  • 如果为 EX:则通过下一个 int 类型获取秒单位的过期时间;
  • 如果为 PX:则通过下一个 int 类型获取毫秒单位的过期时间;
  • 否则说明命令格式有误,返回对应的错误;

Set 命令的 apply 方法和 Get 命令类似,直接通过操作数据库接口即可保存,随后向客户端返回结果即可;

Subscribe命令

Subscribe 命令较为复杂,下面我们来看;

Subscribe 命令定义如下:

/// Subscribes the client to one or more channels.
///
/// Once the client enters the subscribed state, it is not supposed to issue any
/// other commands, except for additional SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE,
/// PUNSUBSCRIBE, PING and QUIT commands.
#[derive(Debug)]
pub struct Subscribe {
    channels: Vec<String>,
}

/// Stream of messages. The stream receives messages from the
/// `broadcast::Receiver`. We use `stream!` to create a `Stream` that consumes
/// messages. Because `stream!` values cannot be named, we box the stream using
/// a trait object.
type Messages = Pin<Box<dyn Stream<Item = Bytes> + Send>>;

Subscribe 命令一次可以订阅多个 channel;

同时定义了 Messages 为 Channel 中传输的数据流;

下面来具体看 parse_frames 和 apply 方法的实现;

parse_frames 方法实现也比较简单:

src/cmd/subscribe.rs

pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Subscribe, MiniRedisParseError> {
  // Extract the first string. If there is none, the the frame is
  // malformed and the error is bubbled up.
  let mut channels = vec![parse.next_string()?];

  loop {
    match parse.next_string() {
      // A string has been consumed from the `parse`, push it into the
      // list of channels to subscribe to.
      Ok(s) => channels.push(s),
      // The `EndOfStream` error indicates there is no further data to
      // parse.
      Err(MiniRedisParseError::EndOfStream) => break,
      // All other errors are bubbled up, resulting in the connection
      // being terminated.
      Err(err) => return Err(err),
    }
  }

  Ok(Subscribe { channels })
}

parse_frames 主要就是将所有要订阅的 channel 名称放入 Vector 中并返回;

接下来是 apply 方法:

pub(crate) async fn apply(mut self,
  db: &Db,
  dst: &mut Connection,
  shutdown: &mut Shutdown,
) -> Result<(), MiniRedisConnectionError> {
  // Each individual channel subscription is handled using a
  // `sync::broadcast` channel. Messages are then fanned out to all
  // clients currently subscribed to the channels.
  //
  // An individual client may subscribe to multiple channels and may
  // dynamically add and remove channels from its subscription set. To
  // handle this, a `StreamMap` is used to track active subscriptions. The
  // `StreamMap` merges messages from individual broadcast channels as
  // they are received.
  let mut subscriptions = StreamMap::new();

  loop {
    // `self.channels` is used to track additional channels to subscribe
    // to. When new `SUBSCRIBE` commands are received during the
    // execution of `apply`, the new channels are pushed onto this vec.
    for channel_name in self.channels.drain(..) {
      Self::subscribe_to_channel(channel_name, &mut subscriptions, db, dst).await?;
    }

    // Wait for one of the following to happen:
    //
    // - Receive a message from one of the subscribed channels.
    // - Receive a subscribe or unsubscribe command from the client.
    // - A server shutdown signal.
    select! {
      // Receive messages from subscribed channels
      Some((channel_name, msg)) = subscriptions.next() => {
        dst.write_frame(&make_message_frame(channel_name, msg)?).await?;
      }
      res = dst.read_frame() => {
        let frame = match res? {
          Some(frame) => frame,
          // This happens if the remote client has disconnected.
          None => {
            warn!("remote subscribe client disconnected");
            return Ok(())
          }
        };

        handle_command(
          frame,
          &mut self.channels,
          &mut subscriptions,
          dst,
        ).await?;
      }
      _ = shutdown.recv() => {
        warn!("server shutdown, stop subscribe");
        return Ok(());
      }
    }
  }
}

由于每个客户端可以同时动态的订阅多个 channel,因此先创建了一个 StreamMap 用于存储 Stream 映射;

接下来,创建一个循环,在循环中:

  • 首先,使用 self.channels.drain 将命令的 channels 中订阅的所有 channel 消费掉(这里不使用 IntoIter 的原因是:这个 Vector 后面还会用到(动态订阅新的channel),我们只是消耗其中的值);
  • 随后,调用 Self::subscribe_to_channel 内部方法,将 channel 订阅至 StreamMap 中;
  • 接着创建一个 select! 等待下面几个事件:
    • 从 StreamMap 中获取消息:则调用连接的 write_frame 将 channel 中的消息返回给 client;
    • 接收到客户端新的 Subscribe 或 Unsubscribe 命令:获取命令并调用 handle_command 函数处理;
    • 接收到 shutdown 命令,则停止订阅;

最后,来看向 StreamMap 中添加 Stream 方法 subscribe_to_channel 和处理客户端新增 Subscribe 或 Unsubscribe 的 handle_command 函数;

subscribe_to_channel 方法实现如下:

async fn subscribe_to_channel(
  channel_name: String,
  subscriptions: &mut StreamMap<String, Messages>,
  db: &Db,
  dst: &mut Connection,
) -> Result<(), MiniRedisConnectionError> {
  let mut rx = db.subscribe(channel_name.clone());

  // Subscribe to the channel.
  let rx = Box::pin(async_stream::stream! {
    loop {
      match rx.recv().await {
        Ok(msg) => yield msg,
        // If we lagged in consuming messages, just resume.
        Err(tokio::sync::broadcast::error::RecvError::Lagged(e)) => {
          warn!("subscribe received lagged: {}", e);
        }
        Err(e) => {
          warn!("subscribe received error: {}", e);
          break
        },
      }
    }
  });

  // Track subscription in this client's subscription set.
  subscriptions.insert(channel_name.clone(), rx);

  debug!("subscribed to channel success: {}", channel_name);

  // Respond with the successful subscription
  let response = make_subscribe_frame(channel_name, subscriptions.len())?;
  dst.write_frame(&response).await?;

  Ok(())
}

逻辑也非常简单:

  • 首先调用 db.subscribe 在数据库中记录 Subscribe 记录;
  • 随后通过 stream! 宏定义流处理逻辑,并加入 StreamMap 中;
  • 最后向客户端返回响应;

最后来看 handle_command 函数:

async fn handle_command(
  frame: Frame,
  subscribe_to: &mut Vec<String>,
  subscriptions: &mut StreamMap<String, Messages>,
  dst: &mut Connection,
) -> Result<(), MiniRedisConnectionError> {
  // A command has been received from the client.
  //
  // Only `SUBSCRIBE` and `UNSUBSCRIBE` commands are permitted in this context.
  match Command::from_frame(frame)? {
    Command::Subscribe(subscribe) => {
      // The `apply` method will subscribe to the channels we add to this
      // vector.
      subscribe_to.extend(subscribe.channels.into_iter());
    }
    Command::Unsubscribe(mut unsubscribe) => {
      // If no channels are specified, this requests unsubscribing from
      // **all** channels. To implement this, the `unsubscribe.channels`
      // vec is populated with the list of channels currently subscribed
      // to.
      if unsubscribe.channels.is_empty() {
        unsubscribe.channels = subscriptions
        .keys()
        .map(|channel_name| channel_name.to_string())
        .collect();
      }

      for channel_name in unsubscribe.channels {
        debug!("begin unsubscribed: {}", channel_name);
        subscriptions.remove(&channel_name);

        let response = make_unsubscribe_frame(channel_name, subscriptions.len())?;
        dst.write_frame(&response).await?;
        debug!("unsubscribed success: {}", response);
      }
    }
    command => {
      let cmd = Unknown::new(command.get_name());
      cmd.apply(dst).await?;
    }
  }
  Ok(())
}

逻辑如下:

  • 如果是 Subscribe:则通过 subscribe_to.extend(subscribe.channels.into_iter()) 将新增的 channel 名称加入到当前的命令数组中(下一次循环通过 drain 消费);
  • 如果是 Unsubscribe:如果没有指定取消订阅的 channel 名称,则加入所有 channel;随后,遍历所有带取消订阅的数组,逐个取消订阅即可;

Unsubscribe命令

Unsubscribe 命令实现非常简单,这里不再赘述:

src/cmd/unsubscribe.rs

use bytes::Bytes;

use crate::connection::frame::Frame;
use crate::connection::parse::Parse;
use crate::error::MiniRedisParseError;

/// Unsubscribes the client from one or more channels.
///
/// When no channels are specified, the client is unsubscribed from all the
/// previously subscribed channels.
#[derive(Clone, Debug)]
pub struct Unsubscribe {
    pub(crate) channels: Vec<String>,
}

impl Unsubscribe {
    pub(crate) fn new(channels: &[String]) -> Unsubscribe {
        Unsubscribe {
            channels: channels.to_vec(),
        }
    }

    pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Unsubscribe, MiniRedisParseError> {
        // There may be no channels listed, so start with an empty vec.
        let mut channels = vec![];

        // Each entry in the frame must be a string or the frame is malformed.
        // Once all values in the frame have been consumed, the command is fully
        // parsed.
        loop {
            match parse.next_string() {
                // A string has been consumed from the `parse`, push it into the
                // list of channels to unsubscribe from.
                Ok(s) => channels.push(s),
                // The `EndOfStream` error indicates there is no further data to
                // parse.
                Err(MiniRedisParseError::EndOfStream) => break,
                // All other errors are bubbled up, resulting in the connection
                // being terminated.
                Err(err) => return Err(err),
            }
        }

        Ok(Unsubscribe { channels })
    }

    pub(crate) fn into_frame(self) -> Result<Frame, MiniRedisParseError> {
        let mut frame = Frame::array();
        frame.push_bulk(Bytes::from("unsubscribe".as_bytes()))?;

        for channel in self.channels {
            frame.push_bulk(Bytes::from(channel.into_bytes()))?;
        }

        Ok(frame)
    }
}

pub(crate) fn make_unsubscribe_frame(
    channel_name: String,
    num_subs: usize,
) -> Result<Frame, MiniRedisParseError> {
    let mut response = Frame::array();
    response.push_bulk(Bytes::from_static(b"unsubscribe"))?;
    response.push_bulk(Bytes::from(channel_name))?;
    response.push_int(num_subs as u64)?;
    Ok(response)
}

如果没有提供具体取消订阅的 channel 名称,则会取消订阅所有 channel;

Publish命令

publish 命令实现非常简单,这里不再赘述:

src/cmd/publish.rs

#[derive(Debug)]
pub struct Publish {
    channel: String,

    message: Bytes,
}

impl Publish {
    pub(crate) fn new(channel: impl ToString, message: Bytes) -> Self {
        Publish {
            channel: channel.to_string(),
            message,
        }
    }

    pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Publish, MiniRedisParseError> {
        let channel = parse.next_string()?;
        let message = parse.next_bytes()?;
        Ok(Publish { channel, message })
    }

    pub(crate) async fn apply(
        self,
        db: &Db,
        dst: &mut Connection,
    ) -> Result<(), MiniRedisConnectionError> {
        // The shared state contains the `tokio::sync::broadcast::Sender` for
        // all active channels. Calling `db.publish` dispatches the message into
        // the appropriate channel.
        //
        // The number of subscribers currently listening on the channel is
        // returned. This does not mean that `num_subscriber` channels will
        // receive the message. Subscribers may drop before receiving the
        // message. Given this, `num_subscribers` should only be used as a
        // "hint".
        let num_subscribers = db.publish(&self.channel, self.message);

        // The number of subscribers is returned as the response to the publish
        // request.
        let response = Frame::Integer(num_subscribers as u64);
        debug!("apply command applied response: {}", response);

        dst.write_frame(&response).await?;

        Ok(())
    }

    pub(crate) fn into_frame(self) -> Result<Frame, MiniRedisParseError> {
        let mut frame = Frame::array();
        frame.push_bulk(Bytes::from("publish".as_bytes()))?;
        frame.push_bulk(Bytes::from(self.channel.into_bytes()))?;
        frame.push_bulk(self.message)?;

        Ok(frame)
    }
}

Unknown命令

如果客户端传入了一个未知的命令,则被解析为一个 Unknown 命令,这个命令非常简单,就是返回错误;

具体实现如下:

src/cmd/unknown.rs

/// Represents an "unknown" command. This is not a real `Redis` command.
#[derive(Debug)]
pub struct Unknown {
    command_name: String,
}

impl Unknown {
    /// Create a new `Unknown` command which responds to unknown commands
    /// issued by clients
    pub(crate) fn new(key: impl ToString) -> Unknown {
        Unknown {
            command_name: key.to_string(),
        }
    }

    pub(crate) fn get_name(&self) -> &str {
        &self.command_name
    }

    /// Responds to the client, indicating the command is not recognized.
    ///
    /// This usually means the command is not yet implemented by `mini-redis`.
    pub(crate) async fn apply(self, dst: &mut Connection) -> Result<(), MiniRedisConnectionError> {
        let response = Frame::Error(format!("err unknown command '{}'", self.command_name));

        debug!("apply unknown command resp: {:?}", response);

        dst.write_frame(&response).await?;
        Ok(())
    }
}

实现非常简单,这里不再解释;

服务停止Shutdown

前面在提到优雅关闭服务时,说到了 Shutdown,最后我们来看一下 Shutdown 的实现;

其实 Shutdown 的实现特别简单,就是通过 broadcast 向所有持有相同 Receiver 的 TCP Handler 发送通知即可;

具体实现如下:

src/server/shutdown.rs

use tokio::sync::broadcast;

/// Listens for the server shutdown signal.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub(crate) struct Shutdown {
    /// `true` if the shutdown signal has been received
    shutdown: bool,

    /// The receive half of the channel used to listen for shutdown.
    notify: broadcast::Receiver<()>,
}

impl Shutdown {
    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
    pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
        Shutdown {
            shutdown: false,
            notify,
        }
    }

    /// Returns `true` if the shutdown signal has been received.
    pub(crate) fn is_shutdown(&self) -> bool {
        self.shutdown
    }

    /// Receive the shutdown notice, waiting if necessary.
    pub(crate) async fn recv(&mut self) {
        // If the shutdown signal has already been received, then return immediately.
        if self.shutdown {
            return;
        }

        // Cannot receive a "lag error" as only one value is ever sent.
        let _ = self.notify.recv().await;

        // Remember that the signal has been received.
        self.shutdown = true;
    }
}

调用 Shutdown 的 recv 异步方法会调用 self.notify.recv().await 被阻塞;

直到接收到了来自 Shutdown Future 触发的消息之后,将 shutdown 置为 true,表示进入 shutdown 阶段;

小结

本文从服务端可执行文件入口入手,讲述了 mini-redis 整个服务端的实现,包括了:

  • 服务端的启动、初始化;
  • 服务监听 Listener;
  • 请求处理 Handler;
  • 执行命令模块 Command;
  • 优雅停机 Shutdown;

下一篇会继续实现客户端,进而完成一个真正可用的 mini-redis!

附录

系列文章:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK