1

时序数据库Influx-IOx源码学习六-1(数据写入之分区)

 3 years ago
source link: https://my.oschina.net/u/3374539/blog/5026139
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
时序数据库Influx-IOx源码学习六-1(数据写入之分区) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章说到如何创建一个数据库,并且数据库的描述信息是如何保存的。详情见:https://my.oschina.net/u/3374539/blog/5025128

这一章记录一下,数据是如何写入并保存的,具体会分为两篇来写:

  • 一篇介绍分区是如何完成的
  • 一篇介绍具体的写入

说到数据写入,必然是需要能够连接到服务器。IOx项目为提供了多种方式可以于服务器进行交互,分别是GrpcHttp基于这两种通信方式,又扩展支持了influxdb2_client以及influxdb_iox_client

基于influxdb_iox_client我写了一个数据写入及查询的示例来观测接口是如何组织的,代码如下:

#[tokio::main]
async fn main() {
    {
        let connection = Builder::default()
            .build("http://127.0.0.1:8081")
            .await
            .unwrap();
        write::Client::new(connection)
            .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#)
            .await
            .expect("failed to write data");
    }

    let connection = Builder::default()
        .build("http://127.0.0.1:8081")
        .await
        .unwrap();

    let mut query = flight::Client::new(connection)
        .perform_query("a", "select * from myMeasurement")
        .await
        .expect("query request should work");

    let mut batches = vec![];

    while let Some(data) = query.next().await.expect("valid batches") {
        batches.push(data);
    }

    let format1 = format::QueryOutputFormat::Pretty;
    println!("{}", format1.format(&batches).unwrap());
}


+------------+--------+--------+-------------------------+
| fieldKey   | tag1   | tag2   | time                    |
+------------+--------+--------+-------------------------+
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
+------------+--------+--------+-------------------------+

因为我多运行了几次,所以能看到数据被重复插入了。

这里还需要说一下的是写入的语句格式可以参见:

[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format


write::Client中的write方法生成了一个WriteRequest结构,并使用RPC调用远程的write方法。打开src/influxdb_ioxd/rpc/write.rs : 22行可以看到方法的具体实现。

async fn write(
        &self,
        request: tonic::Request<WriteRequest>,
    ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
        let request = request.into_inner();
        //得到上面在客户端中写入的数据库名字,在上面的例子中传入的"a"
        let db_name = request.db_name;
        //这里得到了写入的LineProtocol
        let lp_data = request.lp_data;
        let lp_chars = lp_data.len();
        //解析LineProtocol的内容
        //示例中的lp会被解析为:
        //measurement: "myMeasurement"
        //tag_set: [("tag1", "value1"), ("tag2", "value2")]
        //field_set: [("fieldKey", "123")]
        //timestamp: 1556813561098000000
        let lines = parse_lines(&lp_data)
            .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
            .map_err(|e| FieldViolation {
                field: "lp_data".into(),
                description: format!("Invalid Line Protocol: {}", e),
            })?;

        let lp_line_count = lines.len();
        debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database");
        //对数据进行保存
        self.server
            .write_lines(&db_name, &lines)
            .await
            .map_err(default_server_error_handler)?;
        //返回成功
        let lines_written = lp_line_count as u64;
        Ok(Response::new(WriteResponse { lines_written }))
    }

继续看self.server.write_lines的执行:

 pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
        self.require_id()?;
        //验证一下名字,然后拿到之前创建数据库时候在内存中存储的相关信息
        let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
        let db = self
            .config
            .db(&db_name)
            .context(DatabaseNotFound { db_name: &*db_name })?;
        //这里就开始执行分片相关的策略
        let (sharded_entries, shards) = {
            //读取创建数据库时候配置的分片策略
            let rules = db.rules.read();
            let shard_config = &rules.shard_config;
            //根据数据和shard策略,把逐个数据对应的分区找到
            //写入到一个List<分区标识,List<数据>>这样的结构中
            //具体的结构信息后面看
            let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules)
                .context(LineConversion)?;
            //再把所有分区的配置返回给调用者
            let shards = shard_config
                .as_ref()
                .map(|cfg| Arc::clone(&cfg.shards))
                .unwrap_or_default();

            (sharded_entries, shards)
        };

        //根据上面返回的集合进行map方法遍历,写到每个分区中
        futures_util::future::try_join_all(
            sharded_entries
                .into_iter()
                .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
        )
        .await?;

        Ok(())
    }

这里描述了写入一条数据的主逻辑:数据写入的时候,先把数据划分到具体的分区里(使用List结构存储下所有的分区对应的数据),然后并行的进行数据写入

接下来看,数据是如何进行分区的:

pub fn lines_to_sharded_entries(
    lines: &[ParsedLine<'_>],
    sharder: Option<&impl Sharder>,
    partitioner: &impl Partitioner,
) -> Result<Vec<ShardedEntry>> {
    let default_time = Utc::now();
    let mut sharded_lines = BTreeMap::new();

    //对所有要插入的数据进行遍历
    for line in lines {
        //先找到符合哪个shard
        let shard_id = match &sharder {
            Some(s) => Some(s.shard(line).context(GeneratingShardId)?),
            None => None,
        };
        //再判断属于哪个分区
        let partition_key = partitioner
            .partition_key(line, &default_time)
            .context(GeneratingPartitionKey)?;
        let table = line.series.measurement.as_str();
        //最后存储到一个map中
        //shard-> partition -> table -> List<data> 的映射关系
        sharded_lines
            .entry(shard_id)
            .or_insert_with(BTreeMap::new)
            .entry(partition_key)
            .or_insert_with(BTreeMap::new)
            .entry(table)
            .or_insert_with(Vec::new)
            .push(line);
    }

    let default_time = Utc::now();
    //最后遍历这个map 转换到之前提到的List结构中
    let sharded_entries = sharded_lines
        .into_iter()
        .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time))
        .collect::<Result<Vec<_>>>()?;
    Ok(sharded_entries)
}

这里理解shard的概念就是一个或者一组机器,称为一个shard,他们负责真正的存储数据。

partition理解为一个个文件夹,在shard上具体的存储路径。

这里看一下是怎样完成shard的划分的:

impl Sharder for ShardConfig {
    fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> {
        if let Some(specific_targets) = &self.specific_targets {
            //如果对数据进行匹配,如果符合规则就返回,可以采用当前的shard
            //官方的代码中只实现了根据表名进行shard的策略
            //这个配置似乎只能通过grpc来进行设置,这样好处可能是将来有个什么管理界面能动态修改
            if specific_targets.matcher.match_line(line) {
                return Ok(specific_targets.shard);
            }
        }
        //如果没有配置就使用hash的方式
        //对整条数据进行hash,然后比较机器的hash,找到合适的节点
        //如果没找到,就放在hashring的第一个节点
        //hash算法见后面
        if let Some(hash_ring) = &self.hash_ring {
            return hash_ring
                .shards
                .find(LineHasher { line, hash_ring })
                .context(NoShardsDefined);
        }

        NoShardingRuleMatches {
            line: line.to_string(),
        }
        .fail()
    }
}
//具体的Hash算法,如果全配置的话分的就会特别散,几乎不同测点都放到了不同的地方
impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> {
    fn hash<H: Hasher>(&self, state: &mut H) {
        //如果配置了使用table名字就在hash中加入tablename
        if self.hash_ring.table_name {
            self.line.series.measurement.hash(state);
        }
        //然后按照配置的列的值进行hash
        for column in &self.hash_ring.columns {
            if let Some(tag_value) = self.line.tag_value(column) {
                tag_value.hash(state);
            } else if let Some(field_value) = self.line.field_value(column) {
                field_value.to_string().hash(state);t
            }
            state.write_u8(0); // column separator
        }
    }
}

接下来看默认的partition分区方式:

impl Partitioner for PartitionTemplate {
    fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> {
        let parts: Vec<_> = self
            .parts
            .iter()
             //匹配分区策略,或者是单一的,或者是复合的
             //目前支持基于表、值、时间
             //其余还会支持正则表达式和strftime模式
            .map(|p| match p {
                TemplatePart::Table => line.series.measurement.to_string(),
                TemplatePart::Column(column) => match line.tag_value(&column) {
                    Some(v) => format!("{}_{}", column, v),
                    None => match line.field_value(&column) {
                        Some(v) => format!("{}_{}", column, v),
                        None => "".to_string(),
                    },
                },
                TemplatePart::TimeFormat(format) => match line.timestamp {
                    Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(),
                    None => default_time.format(&format).to_string(),
                },
                _ => unimplemented!(),
            })
            .collect();
        //最后返回一个组合文件名,或者是 a-b-c 或者是一个单一的值
        Ok(parts.join("-"))
    }
}

到这里分区的工作就完成了,下一篇继续分析是怎样写入的。

祝玩儿的开心


欢迎关注微信公众号:

或添加微信好友: liutaohua001


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK