2

2022-22: 为 Databend 实现压缩支持

 1 year ago
source link: https://xuanwo.io/reports/2022-22/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

2022-22: 为 Databend 实现压缩支持

Iteration 14 从 5/21 开始到 6/4 结束,为期两周。这个周期主要在做 Databend 的压缩支持,具体的来说是支持如下功能:

支持读取 Stage/Location 中的压缩文件

copy into ontime200 from '@s1' FILES = ('ontime_200.csv.gz') 
    FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'gzip'  record_delimiter = '\n' skip_header = 1);

支持流式上传压缩文件:

curl -H "insert_sql:insert into ontime_streaming_load format Csv" \
    -H "skip_header:1" \
    -H "compression:zstd" \
    -F  "upload=@/tmp/ontime_200.csv.zst" \
    -u root: \
    -XPUT \
    "http://localhost:8000/v1/streaming_load"

Rust 目前压缩算法的库已经比较齐全,所以主要的工作在于如何将解压缩的逻辑与 Databend 现有的逻辑整合到一起。


解压缩工作流程

大部分解压缩算法都可以抽象为这样的状态机:

decompress.png

其中最复杂的是 Decode 流程:

  • 当数据消费完毕后需要获取更多数据
  • 当数据还没有消费完成时需要再次调用 Decode

有的压缩算法会支持多个对象,因此在 Flush 状态时也有可能进行 reinit,重新开始一轮新的解压缩,此处不再赘述。

OpenDAL 中就运用这样的抽象:

pub enum DecompressState {
    Reading,
    Decoding,
    Flushing,
    Done,
}

对外暴露了 DecompressDecoder

impl DecompressDecoder {
    /// Get decompress state
    pub fn state(&self) -> DecompressState {}
    /// Fetch more data from underlying reader.
    pub fn fill(&mut self, bs: &[u8]) -> usize {}
    /// Decode data into output.
    pub fn decode(&mut self, output: &mut [u8]) -> Result<usize> {}
    /// Finish a decompress press, flushing remaining data into output.
    pub fn finish(&mut self, output: &mut [u8]) -> Result<usize> {}
}

为了方便用户使用,OpenDAL 在 DecompressDecoder 基础上实现了 DecompressReader

impl<R: BytesRead> futures::io::AsyncRead for DecompressReader<R> {}

对接 async-compression

async-compression 是由 @Nemo157 开发的异步压缩库,支持了绝大多数常用的压缩算法。OpenDAL 就是基于 async-compression 实现的:

其内部使用了 Decode trait:

pub trait Decode {
    /// Reinitializes this decoder ready to decode a new member/frame of data.
    fn reinit(&mut self) -> Result<()>;

    /// Returns whether the end of the stream has been read
    fn decode(
        &mut self,
        input: &mut PartialBuffer<impl AsRef<[u8]>>,
        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
    ) -> Result<bool>;

    /// Returns whether the internal buffers are flushed
    fn flush(&mut self, output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>)
        -> Result<bool>;

    /// Returns whether the internal buffers are flushed
    fn finish(
        &mut self,
        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
    ) -> Result<bool>;
}

async-compression 为了避免将内部实现的细节暴露给用户,codec 相关的模块都是 private 的,用户只能通过 bufread 等方式来调用。

但这种方式在 Databend 中是行不通的:Databend 为了能够更充分的利用机器资源,对异步和同步任务进行了严格的划分:

  • CPU Bound 的任务必须要在同步的 Runtime 中进行
  • IO Bound 的任务必须要在异步的 Runtime 中进行

如果在异步 Runtime 中进行解压缩,可能会 block runtime,降低整体的吞吐性能。

想要自己掌控解压缩发生在哪个 runtime,我们就必须要能直接操作底层的 Decode,为此我提交了一份: proposal: Export codec and Encode/Decode trait。作者最近刚好也在从事相关的工作,想把底层的一些 Codec 暴露出来。于是我在 proposal 中详细地介绍了 Databend 的 Use Case,并分享了自己的临时 Workaround。实际上 async-compression 的代码组织的很好,只需要将内部的 Decode trait 及其实现都 public 出来即可。

由于发布到 crate 的包必须使用 tagged version,所以我把这个 wordaround 也发布成了一个 crate: async-compression-issue-150-workaround

这里用到了一个 Cargo.toml 的小技巧:为 package 指定一个 alias,让它能够在不修改 crate name 的情况下使用自己指定的另一个包。

# Temp workaround, should come back to tagged version after https://github.com/Nemo157/async-compression/issues/150 resolved.
async-compression = { package = "async-compression-issue-150-workaround", version = "0.3.15-issue-150", features = [
  "futures-io",
  "all-algorithms",
], optional = true }

对接 Databend

完成了 OpenDAL 测的开发,Databend 这边主要是对接和测试工作了。在对接的时候意外的发现 Databend 目前处理 Streaming Loading 和 COPY FROM STAGE 走的是两条完全不同的路径,所以解压缩的处理也不得实现了两遍,未来希望能将他们统一起来。

目前为止,Databend 已经能够支持 GZIPBZ2BROTLIZSTDDEFLATERAW_DEFLATE 等压缩算法,未来根据需要还可以进一步扩展,欢迎大家来试用解压缩功能!

下一步计划

  • 重构流式加载和从 STAGE 加载的逻辑,尽可能复用相同的逻辑
  • 解压缩性能测试及其优化
  • 支持 LZO,SNAPPY 等压缩算法
  • 支持 ZIP,TAR 等归档格式

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK