2

2022-06: Iteration 6 汇报

 2 years ago
source link: https://xuanwo.io/reports/2022-06/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

2022-06: Iteration 6 汇报

Iteration 6 包括了完整的春节,大多数时间都在玩手机打游戏,做的事情不多。这一次的汇报主要讲讲自己最近取得的一些工作进展和新入手的好玩意儿。

Databend DAL2 is online

经过长达两个月的战斗之后,DAL2 终于彻底取代了旧有的 DAL 实现,成为了 Databend 与存储交互的新基座,完整的贡献历史可以参见 Tracking issue for Vision of Databend DAL。在这个周期中,我主要做了两件事:增加了 Layer 抽象和 DAL 替换。

首先是 Layer 抽象。过去 DAL 提供了 DataAccessorInterceptor 用来对 DAL 做一些拦截操作,比如更新统计数据中的 read_byteswrite_bytes 等。但是他跟业务耦合的过于紧密,让他未来的扩展和维护受到了局限:

/// A interceptor for data accessor.
pub struct DataAccessorInterceptor {
    ctx: Arc<DalContext>,
    inner: Arc<dyn DataAccessor>,
}

impl DataAccessorInterceptor {
    pub fn new(ctx: Arc<DalContext>, inner: Arc<dyn DataAccessor>) -> Self {
        Self { ctx, inner }
    }
}

#[async_trait::async_trait]
impl DataAccessor for DataAccessorInterceptor {
    fn get_input_stream(
        &self,
        path: &str,
        stream_len: Option<u64>,
    ) -> common_exception::Result<InputStream> {
        self.inner
            .get_input_stream(path, stream_len)
            .map(|input_stream| {
                let r = InputStreamInterceptor::new(self.ctx.clone(), input_stream);
                Box::new(r) as Box<dyn AsyncSeekableReader + Unpin + Send>
            })
    }

    async fn put(&self, path: &str, content: Vec<u8>) -> common_exception::Result<()> {
        let len = content.len();
        self.inner
            .put(path, content)
            .await
            .map(|_| self.ctx.inc_write_bytes(len as usize))
    }

    async fn put_stream(
        &self,
        path: &str,
        input_stream: Box<
            dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Send + Unpin + 'static,
        >,
        stream_len: usize,
    ) -> common_exception::Result<()> {
        self.inner
            .put_stream(path, input_stream, stream_len)
            .await
            .map(|_| self.ctx.inc_write_bytes(stream_len as usize))
    }

    async fn remove(&self, path: &str) -> common_exception::Result<()> {
        self.inner.remove(path).await
    }
}

所以我模仿 tower 的实现,增加了一个 Layer 抽象,将 DalContext 从 dal 的逻辑中彻底剥离了出去:

pub trait Layer {
    fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor>;
}

impl<T: Layer> Layer for Arc<T> {
    fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
        self.as_ref().layer(inner)
    }
}

我增加了一个新的 crate dal-context 来实现这些业务逻辑:

pub struct DalContext {
    inner: Option<Arc<dyn Accessor>>,
    metrics: Arc<RwLock<DalMetrics>>,
}

impl Layer for DalContext {
    fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
        Arc::new(DalContext {
            inner: Some(inner),
            metrics: self.metrics.clone(),
        })
    }
}

#[async_trait]
impl Accessor for DalContext {
    async fn read(&self, args: &OpRead) -> DalResult<Reader> {
        let metrics = self.metrics.clone();

        // TODO(xuanwo): Maybe it's better to move into metrics.
        self.inner.as_ref().unwrap().read(args).await.map(|reader| {
            let r = CallbackReader::new(reader, move |n| {
                let mut metrics = metrics.write();
                metrics.read_bytes += n;
            });

            Box::new(r) as Reader
        })
    }
    async fn write(&self, r: Reader, args: &OpWrite) -> DalResult<usize> {
        self.inner.as_ref().unwrap().write(r, args).await.map(|n| {
            self.inc_write_bytes(n);
            n
        })
    }
    async fn stat(&self, args: &OpStat) -> DalResult<Object> {
        self.inner.as_ref().unwrap().stat(args).await
    }
    async fn delete(&self, args: &OpDelete) -> DalResult<()> {
        self.inner.as_ref().unwrap().delete(args).await
    }
}

看起来舒服多了!

dal-context 就绪之后,终于可以开始替换 query 中的 dal 实现了: query: Replace dal with dal2, let’s rock!。过去的 DAL 跟业务逻辑耦合的非常紧密,所以这个 PR 中我计划专注于让它先工作起来,能够通过编译,Clippy 能开心,所有的测试能够通过,暂时不考虑业务逻辑的重构和性能优化等问题。在测试的时候遇到了一些难搞的问题:

其一跟业务逻辑有关。由于 DAL 目标是做一个通用的存储模块,所以设计了自己的独立 Error 类型,没有复用 Databend 本身的 ErrorCode 逻辑。为了简化迁移操作,在当前 PR 中我只是把所有的 DAL Error 都转换成了 ErrorCode 中的 Transport Error。但是某个模块中恰恰依赖一个特定的 ErrorCode 做检查,导致相关的测试全都报错了。最后在 @dantengsky 的帮助下定位到了问题并给予了临时的 fix,后面计划会将 DAL 的错误正确的导出到 ErrorCode 中。

其二是跟 block_on 使用的有关问题。Async Rust 当前的设计使得 sync/async 之间的代码交互非常痛苦,一个无奈的选择就是使用 block_on。但是从实践上来看,block_on 经常会导致整个 runtime 没有线程可以调度从而死锁,所以最好能回避这种用法。最后是把上下游的函数都改造成了 async 解决了。

DAL2 被合并了之后还有不少事情需要做,增加更多的单元测试,实现专门的集成测试,优化整体的性能,增加更多的服务支持,欢迎大家参与!

即热饮水机: 早就该买了!

我们家的喝水方案一直都比较简陋:买 5L 装的矿泉水,想喝热的就只能用电水壶现烧然后再放凉。期间多次考虑过买一个饮水机,但是不少缺点让我们迟迟没有下决定:

  • 很多饮水机都太大,不适合我们这样的小家小户
  • 饮水机需要买桶装水,而我们家附近的桶装水质量和服务感觉都不太行
  • 饮水机的工作原理决定了它不是特别卫生

直到有一天我逛京东的时候看到了一个新产品:即热饮水机。它跟传统的饮水机不同之处在于它是用户需要的时候即时加热水,不存储热水,而且我选中的这一款饮水机提供了聪明座,能够兼容 5L 装的农夫山泉。除此以外,它还提供了如下功能:

  • 支持 25 到 99 度之间的逐级调温,此外还提供了常用的 25,55,85,99 等温度档位
  • 支持小杯(200ml),中杯(350ml),大杯(500ml)的出水量选择
  • 过热保护,缺水警告等安全功能

有了它之后我们家的喝水方案彻底升级了,覆盖了我们各种各样的用水需求:

  • 方便面:99 度 + 大杯
  • 咖啡:85 度 + 小杯
  • 绿茶:85 度 + 中杯
  • 蜂蜜水:55 度 + 中杯
  • 凉水:25 度

在有了即热饮水机之后,我每天的饮水量获得大幅度提升,感觉最近身体的状态好了很多,真的是早就该买了!

这次体验也启发了我这样一个观点:在线处理的用户体验比离线处理好太多了,我们要大搞特搞在线实时分析(



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK