5

时序数据库Influx-IOx源码学习十(查询主流程)

 3 years ago
source link: https://my.oschina.net/u/3374539/blog/5034513
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
时序数据库Influx-IOx源码学习十(查询主流程) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一篇粗略的总结了写入的基本流程,详情见:

https://my.oschina.net/u/3374539/blog/5033469

这一篇记录一下查询的主要流程。


在第六章中,写了一个查询示例,如下:

 let mut query = flight::Client::new(connection)
        .perform_query("databaseName", "select * from myMeasurement")
        .await
        .expect("query request should work");

其中connection,代表的建立了一个Grpc的连接。perform_query代表执行查询,其中第一个参数是数据库名字,第二个参数是要执行查询的sql语句。这个perform_query是封装了一下调用协议,然后调用了服务器端的do_get方法,do_get方法在服务器的src/influxdb_ioxd/rpc/flight.rs:139行可以找到,如下:

async fn do_get(
        &self,
        //这个Ticket里就是保存的perform_query方法中封装的json数据
        request: Request<Ticket>,
    ) -> Result<Response<Self::DoGetStream>, tonic::Status> {
        //这里就是把json还原回来
        let ticket = request.into_inner();
        let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket {
            ticket: ticket.ticket,
        })?;
        //反序列化成了ReadInfo结构
        let read_info: ReadInfo =
            serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?;
        //拿到客户端设置的数据库名字
        let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?;
        //从内存中查找是否存在这个database名字,如果不存在就会报DatabaseNotFound错误回去
        //这里就是创建数据库的时候写入到内存里的
        //同时还应该记得iox的数据库必须一个节点创建一次。。hhhhha
        let db = self.server.db(&database).context(DatabaseNotFound {
            database_name: &read_info.database_name,
        })?;
        //这个是拿到之前创建数据库时候设置的线程池,可以回去参考第五章
        let executor = db.executor();
        //这里是创建出sql语句对应的physical_plan,后面再看
        let physical_plan = Planner::new(Arc::clone(&executor))
            .sql(db, &read_info.sql_query)
            .await
            .context(Planning)?;

        //使用线程异步的执行查询
        let results = executor
             //复制一下执行时候需要用到的信息
            .new_context()
             //真正的去执行
            .collect(Arc::clone(&physical_plan))
            .await
            .map_err(|e| Box::new(e) as _)
            .context(Query {
                database_name: &read_info.database_name,
            })?;

        //在写入的章节里应该知道了在RBChunk里面存储的是Arrow格式的。
        //在这个方法中就是调用arrow_flight工具包的方法,先把schema序列化到flight_buffer中
        let options = arrow::ipc::writer::IpcWriteOptions::default();
        let schema = physical_plan.schema();
        let schema_flight_data =
            arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options);

        let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)];
        //上面得到的结果集,这里进行遍历,封装为要返回的数据结构
        let mut batches: Vec<Result<FlightData, tonic::Status>> = results
            .iter()
            //这个是为了给下面flight_data_from_arrow_batch这个方法打补丁用的
            //因为这个方法即便对于切片类型的batch也是盲目的序列化所有数据
            .map(optimize_record_batch)
            .collect::<Result<Vec<_>, Error>>()?
            .iter()
            //这里就是一条一条的把数据序列化到缓冲区里
            .flat_map(|batch| {
                let (flight_dictionaries, flight_batch) =
                    arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
                //把数据包装在Result中
                flight_dictionaries
                    .into_iter()
                    .chain(std::iter::once(flight_batch))
                    .map(Ok)
            })
            .collect();

        //前面是schema,后面是数据
        flights.append(&mut batches);
        //返回一个数据的异步stream,有可能调用一次next就会释放一次cpu?
        let output = futures::stream::iter(flights);
        //数据以flight形式发送到了客户端,客户端先读取schema再读取数据。
        Ok(Response::new(Box::pin(output) as Self::DoGetStream))
    }

这里基本上是整个查询的主逻辑:

  • 异步的将sql转换为plan。
  • 异步的去执行plan并返回结果和结果所对应的schema信息。
  • 将返回的arrow数据封装到flights格式中。
  • 通过Grpc返回

这一篇就到这里吧,下几章准备记录一下:

  1. sql是怎么被执行的
  2. 查询中都经历了什么
  3. 等等。。。

祝玩儿的开心。


欢迎关注微信公众号:

或添加微信好友: liutaohua001


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK