4

时序数据库Influx-IOx源码学习四(Run命令的执行)

 3 years ago
source link: https://my.oschina.net/u/3374539/blog/5021654
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
时序数据库Influx-IOx源码学习四(Run命令的执行) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区

欢迎关注公众号:

上篇介绍到:InfluxDB-IOx的命令行及配置,详情见:https://my.oschina.net/u/3374539/blog/5017858

这章记录一下Run命令的执行过程。

 //根据用户在命令行配置的num_threads参数
 //来选择创建一个多线程的模型,还是current_thread的模型
 //后面有时间深入研究tokio的时候再来分析有什么异同
 let tokio_runtime = get_runtime(config.num_threads)?;
 //block_on会让线程一直等待方法里的future执行完成
 //这是让闭包中的方法占有了io driver 和 timer context
 tokio_runtime.block_on(async move {
        let host = config.host;
        match config.command {
            // 省略其它command ... 

            Command::Run(config) => {
                //具体去子类型里执行,然后await一个结果
                if let Err(e) = commands::run::command(logging_level, *config).await {
                    eprintln!("Server command failed: {}", e);
                    std::process::exit(ReturnCode::Failure as _)
                }
            }
        }
});

influxdb_ioxd::main方法中,忽略一些不太需要重点关注的,分别是初始化log的管理、PanicsTracingCancellationToken等。

    //初始化对象存储
    let object_store = ObjectStore::try_from(&config)?;
    //可以看到,目前已经支持了
    //1.内存(在container环境运行时候使用)
    //2.Google
    //3.S3
    //4.Azure
    //5.File 本地文件,方便开发者调试运行在云上时候的文件变化
    fn try_from(config: &Config) -> Result<Self, Self::Error> {
        match config.object_store {
            Some(ObjStoreOpt::Memory) | None => {
           //创建一个btreemap用来缓存或者搜索
           Ok(Self::new_in_memory(object_store::memory::InMemory::new()))
            }

            Some(ObjStoreOpt::Google) => {
                // 省略
            }

            Some(ObjStoreOpt::S3) => {
               // 省略
            }

            Some(ObjStoreOpt::Azure) => { 
              // 省略
            }

            Some(ObjStoreOpt::File) => match config.database_directory.as_ref() {
                Some(db_dir) => {
                    //去递归创建这个配置路径中的文件夹
                    //context也是使用的snafu来处理错误的
                    fs::create_dir_all(db_dir)
                        .context(CreatingDatabaseDirectory { path: db_dir })?;
                    //都创建完成,并且没出错误,把路径保存起来
                    Ok(Self::new_file(object_store::disk::File::new(&db_dir)))
                }
                // 如果database_directory这个参数没有配置的时候
                //使用snafu这个crate来返回一个错误
                None => MissingObjectStoreConfig {
                    object_store: ObjStoreOpt::File,
                    missing: "data-dir",
                }
                .fail(),
            },
        }
    }

关于错误处理的代码:

 #[snafu(display("Unable to create database directory {:?}: {}", path, source))]
    CreatingDatabaseDirectory {
        path: PathBuf,
        source: std::io::Error,
    },

 #[snafu(display(
        "Specified {} for the object store, required configuration missing for {}",
        object_store,
        missing
    ))]
    MissingObjectStoreConfig {
        object_store: ObjStoreOpt,
        missing: String,
    },

我们来测试一下错误的场景,来看看是否符合代码的预期。

// 不传入路径
 cargo run run --object-store file
    Finished dev [unoptimized + debuginfo] target(s) in 0.42s
     Running `./influxdb_iox run --object-store file`
Apr 15 13:38:34.352  INFO influxdb_iox::influxdb_ioxd: Using File for object storage
Server command failed: Run: Specified File for the object store, required configuration missing for data-dir

//传入一个创建不了的路径
cargo run run --object-store file --data-dir /root/1/1
    Finished dev [unoptimized + debuginfo] target(s) in 0.47s
     Running `./influxdb_iox run --object-store file --data-dir /root/1/1`
Apr 15 13:45:26.664  INFO influxdb_iox::influxdb_ioxd: Using File for object storage
Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30)

可以看到是符合预期的,bingo


//创建一个空的结构体
let connection_manager = ConnectionManager {};
//创建AppServer结构体用来保存基本的信息
//server_config里就是保存的对象存储的信息及线程配置
//如果num_worker_threads没有填写,默认就使用cpu数量
let app_server = Arc::new(AppServer::new(connection_manager, server_config));
//不设置这个writer_id能启动,但是不能做任何操作
if let Some(id) = config.writer_id {
        //compare and set 一个非0的数值,错误就打印一个指定的panic
        app_server.set_id(id).expect("writer id already set");
        //校验所有的配置
        if let Err(e) = app_server.load_database_configs().await {
            error!(
                "unable to load database configurations from object storage: {}",
                e
            )
        }
    } else {
        warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
    }

接下来进入load_database_configs方法看看,

let list_result = self
            .store
            //把write_id和配置的文件路径组合一下,作为一个目录
            //遍历文件夹中的所有东西,用一个BTreeSet存所有子文件夹
            //用Vec存下所有的文件信息,包括路径、修改时间、大小等
            .list_with_delimiter(&self.root_path()?)
            .await
            .context(StoreError)?;
        //拿到配置的server的write_id
        let server_id = self.require_id()?;

        let handles: Vec<_> = list_result
            //配置的文件夹下的所有文件夹
            .common_prefixes
            .into_iter()
            //全部进行map转换
            .map(|mut path| {
                let store = Arc::clone(&self.store);
                let config = Arc::clone(&self.config);
                let exec = Arc::clone(&self.exec);
                //先找database的相关信息文件,名字叫rules.pb
                path.set_file_name(DB_RULES_FILE_NAME);
                //感觉是需要io来读取文件内容,所以开一个异步
                tokio::task::spawn(async move {
                    let mut res = get_store_bytes(&path, &store).await;

                    //省略错误处理。。

                    let res = res.unwrap().freeze();
                    //解析文件内容,根据文件名可以看出是个pb文件。
                    match DatabaseRules::decode(res) {
                        Err(e) => {
                            //省略错误。。
                        }
                        //根据解析出来的文件内容,在内存中恢复回来db的相关信息
                        Ok(rules) => match config.create_db(rules) {
                            Err(e) => error!("error adding database to config: {}", e),
                            //提交一个后台任务,用来不断的检测chunks的状态
                            //比如达到了某个大小,然后写入到存储等
                            Ok(handle) => handle.commit(server_id, store, exec),
                        },
                    }
                })
            })
            .collect();
        //等待所有任务完成
        futures::future::join_all(handles).await;

这里就启动完成了一个基本的服务,创建了存储路径、初始化数据库的基本配置、启动了一个用来刷盘、整理chunk的后台任务。


接下来就是启动连接相关的了。

    //从启动命令行中读取grpc的地址
    let grpc_bind_addr = config.grpc_bind_address;
    //绑定这个地址
    let socket = tokio::net::TcpListener::bind(grpc_bind_addr)
        .await
        .context(StartListeningGrpc { grpc_bind_addr })?;
    //真正的协议启动
    let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();

    //同样的启动http相关的服务,使用的hyper库
    let bind_addr = config.http_bind_address;
    let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?;
    let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();

    //省略后面的停止流程。。。

然后看grpc的启动的服务

    //启动起来健康检查的服务
    let stream = TcpListenerStream::new(socket);
    let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
    //标识相对应的服务已经是可以提供服务的状态了
    let services = [
        generated_types::STORAGE_SERVICE,
        generated_types::IOX_TESTING_SERVICE,
        generated_types::ARROW_SERVICE,
    ];
    for service in &services {
        health_reporter
            .set_service_status(service, tonic_health::ServingStatus::Serving)
            .await;
    }

   //增加一堆使用grpc的服务,并启动起来
    tonic::transport::Server::builder()
        .add_service(health_service)
        .add_service(testing::make_server())
        .add_service(storage::make_server(Arc::clone(&server)))
        .add_service(flight::make_server(Arc::clone(&server)))
        .add_service(write::make_server(Arc::clone(&server)))
        .add_service(management::make_server(Arc::clone(&server)))
        .add_service(operations::make_server(server))
        .serve_with_incoming_shutdown(stream, shutdown.cancelled())
        .await

然后是http相关的启动

pub async fn serve<M>(
    addr: AddrIncoming,
    server: Arc<AppServer<M>>,
    shutdown: CancellationToken,
) -> Result<(), hyper::Error>
where
    M: ConnectionManager + Send + Sync + Debug + 'static,
{
    //初始化路由相关的信息
    let router = router(server);
    let service = RouterService::new(router).unwrap();
    //启动服务
    hyper::Server::builder(addr)
        .serve(service)
        .with_graceful_shutdown(shutdown.cancelled())
        .await
}

顺便看一下都提供了哪些地址可以被访问的:

 Router::builder()
        .data(server)
        //写了一个拦截,打印请求参数和返回结果
        .middleware(Middleware::pre(|req| async move {
            debug!(request = ?req, "Processing request");
            Ok(req)
        }))
        .middleware(Middleware::post(|res| async move {
            debug!(response = ?res, "Successfully processed request");
            Ok(res)
        })) // this endpoint is for API backward compatibility with InfluxDB 2.x
        .post("/api/v2/write", write::<M>)
        .get("/health", health)
        .get("/metrics", handle_metrics)
        .get("/iox/api/v1/databases/:name/query", query::<M>)
        .get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>)
        .get("/api/v1/partitions", list_partitions::<M>)
        .post("/api/v1/snapshot", snapshot_partition::<M>)
        //错误的时候调用的处理拦截
        .err_handler_with_info(error_handler)
        .build()
        .unwrap()

做一个/health的测试:

curl localhost:8080/health
OK%

可以看到成功返回了值。


到这里基本启动就完成了,后面再用到的时候会继续对启动里的细节做研究,比如PanicsLog等等吧,欢迎持续关注。

祝玩儿的开心


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK