7

时序数据库Influx-IOx源码学习五(创建数据库)

 3 years ago
source link: https://my.oschina.net/u/3374539/blog/5025128
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
时序数据库Influx-IOx源码学习五(创建数据库) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上篇介绍到:InfluxDB-IOx的Run命令启动过程,详情见:https://my.oschina.net/u/3374539/blog/5021654

这章记录一下Database create命令的执行过程。


在第三章命令行中介绍了,所有的子命令都有一个独立的参数或配置称为subcommand

enum Command {
    Convert { // 省略 ...},
    Meta {// 省略 ...},
    Database(commands::database::Config),
    Run(Box<commands::run::Config>),
    Stats(commands::stats::Config),
    Server(commands::server::Config),
    Writer(commands::writer::Config),
    Operation(commands::operations::Config),
}

这章我们打开看一眼commands::database下的config包含了什么。

pub struct Config {
    #[structopt(subcommand)]
    command: Command,
}
//见名知意,基本猜测一下就行了,慢慢使用到再回来看
enum Command {
    Create(Create),
    List(List),
    Get(Get),
    Write(Write),
    Query(Query),
    Chunk(chunk::Config),
    Partition(partition::Config),
}

先来看一下create命令的执行。

Command::Create(command) => {
            //创建一个grpc的client
            let mut client = management::Client::new(connection);
            //设置基本的配置项
            let rules = DatabaseRules {
                //数据库名字
                name: command.name,
                //内存的各种配置,包含缓存大小,时间等等
                lifecycle_rules: Some(LifecycleRules {
                    //省略。。
                }),
                //设置分区的策略
                partition_template: Some(PartitionTemplate {
                    //省略。。
                }),

                 //其它都填充default
                ..Default::default()
            };
            //使用配置信息创建数据库,这里是生成了一个CreateDatabaseRequest去调用了远程服务器的方法
            client.create_database(rules).await?;

            println!("Ok");
        }

在上一章中提到了grpc的启动,这里就涉及到了之前提到的grpc的框架tonic,在tonic中使用#[tonic::async_trait]了标记一个服务器端的实现开始。我在ide中搜索,可以在src/influxdb_ioxd/rpc/management.rs:50行中找到ManagementService相关的实现。

有关tonic更多的资料请阅读:https://github.com/hyperium/tonic

#[tonic::async_trait]
impl<M> management_service_server::ManagementService for ManagementService<M>
where
    M: ConnectionManager + Send + Sync + Debug + 'static,
{
    //省略其它方法。。。

 async fn create_database(
        &self,
        //这里就是接收CreateDatabaseRequest的请求
        request: Request<CreateDatabaseRequest>,
    ) -> Result<Response<CreateDatabaseResponse>, Status> {

         //对数据进行一下校验,然后获得在上面配置的rules规则
        let rules: DatabaseRules = request
            .into_inner()
            .rules
            .ok_or_else(|| FieldViolation::required(""))
            .and_then(TryInto::try_into)
            .map_err(|e| e.scope("rules"))?;

        //这里就是在第三章中提到的server_id,如果没配置就会报错了
        let server_id = match self.server.require_id().ok() {
            Some(id) => id,
            None => return Err(NotFound::default().into()),
        };
        //这里就是真正的去创建,在下面继续跟踪
        match self.server.create_database(rules, server_id).await {
            Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
            Err(Error::DatabaseAlreadyExists { db_name }) => {
                return Err(AlreadyExists {
                    resource_type: "database".to_string(),
                    resource_name: db_name,
                    ..Default::default()
                }
                .into())
            }
            Err(e) => Err(default_server_error_handler(e)),
        }
    }
}

接下来要继续查看数据库真正的被创建出来,我读到这里存在一个问题,文件格式是什么样子的?

pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> {
        //检查server_id
        self.require_id()?;
        //把数据库名字存储到内存中,最终保存到一个btreemap中
        let db_reservation = self.config.create_db(rules)?;
        //对数据进行持久化保存
        self.persist_database_rules(db_reservation.rules().clone())
            .await?;
        //启动数据库后台线程,在内存中写入数据库状态
        db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec));

        Ok(())
    }

来解答上面的疑问,文件是怎样持久化、格式是什么样子的。

pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
        //生成一个新的数据库路径
        let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
        //序列化DatabaseRules这个pb到byte流
        let mut data = BytesMut::new();
        rules.encode(&mut data).context(ErrorSerializing)?;
        let len = data.len();
        let stream_data = std::io::Result::Ok(data.freeze());
        //将pb的内容进行存储
        self.store
            .put(
                &location,
                futures::stream::once(async move { stream_data }),
                Some(len),
            )
            .await
            .context(StoreError)?;
        Ok(())
    }

这里调用了rules.encode()转换到pb的格式,这里是rust语言的一个方法,实现了From特性的,就得到了一个into的方法,如:impl From<DatabaseRules> for management::DatabaseRules.

到这里数据库的一个描述文件rules.pb就被写入到磁盘中了,路径是启动命令中指定的--data-dir参数路径 + --writer-id + 数据库名字。

例如,我的启动和创建命令为:

./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/
./influxdb_iox database create test

那么得到的路径就为:~/influxtest/1/test/rules.pb. 之后可以运行一个pb的脚本来反查rules.pb中的数据内容,如下:

$ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules \
    < ~/influxtest/1/test/rules.pb

influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused.
name: "test"
partition_template {
  parts {
    time: "%Y-%m-%d %H:00:00"
  }
}
lifecycle_rules {
  mutable_linger_seconds: 300
  mutable_size_threshold: 10485760
  buffer_size_soft: 52428800
  buffer_size_hard: 104857600
  sort_order {
    order: ORDER_ASC
    created_at_time {
    }
  }
}

看到这里已经知道整个生成过程及文件内容。

祝玩儿的开心。


欢迎关注微信公众号:

或添加微信好友: liutaohua001


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK