

时序数据库Influx-IOx源码学习六-1(数据写入之分区)
source link: https://my.oschina.net/u/3374539/blog/5026139
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。
InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上一章说到如何创建一个数据库,并且数据库的描述信息是如何保存的。详情见:https://my.oschina.net/u/3374539/blog/5025128
这一章记录一下,数据是如何写入并保存的,具体会分为两篇来写:
- 一篇介绍分区是如何完成的
- 一篇介绍具体的写入
说到数据写入,必然是需要能够连接到服务器。IOx
项目为提供了多种方式可以于服务器进行交互,分别是Grpc
和Http
基于这两种通信方式,又扩展支持了influxdb2_client
以及influxdb_iox_client
。
基于influxdb_iox_client
我写了一个数据写入及查询的示例来观测接口是如何组织的,代码如下:
#[tokio::main]
async fn main() {
{
let connection = Builder::default()
.build("http://127.0.0.1:8081")
.await
.unwrap();
write::Client::new(connection)
.write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#)
.await
.expect("failed to write data");
}
let connection = Builder::default()
.build("http://127.0.0.1:8081")
.await
.unwrap();
let mut query = flight::Client::new(connection)
.perform_query("a", "select * from myMeasurement")
.await
.expect("query request should work");
let mut batches = vec![];
while let Some(data) = query.next().await.expect("valid batches") {
batches.push(data);
}
let format1 = format::QueryOutputFormat::Pretty;
println!("{}", format1.format(&batches).unwrap());
}
+------------+--------+--------+-------------------------+
| fieldKey | tag1 | tag2 | time |
+------------+--------+--------+-------------------------+
| 123 | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123 | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123 | value1 | value2 | 2019-05-02 16:12:41.098 |
+------------+--------+--------+-------------------------+
因为我多运行了几次,所以能看到数据被重复插入了。
这里还需要说一下的是写入的语句格式可以参见:
[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
write::Client
中的write
方法生成了一个WriteRequest
结构,并使用RPC
调用远程的write
方法。打开src/influxdb_ioxd/rpc/write.rs : 22行
可以看到方法的具体实现。
async fn write(
&self,
request: tonic::Request<WriteRequest>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let request = request.into_inner();
//得到上面在客户端中写入的数据库名字,在上面的例子中传入的"a"
let db_name = request.db_name;
//这里得到了写入的LineProtocol
let lp_data = request.lp_data;
let lp_chars = lp_data.len();
//解析LineProtocol的内容
//示例中的lp会被解析为:
//measurement: "myMeasurement"
//tag_set: [("tag1", "value1"), ("tag2", "value2")]
//field_set: [("fieldKey", "123")]
//timestamp: 1556813561098000000
let lines = parse_lines(&lp_data)
.collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
.map_err(|e| FieldViolation {
field: "lp_data".into(),
description: format!("Invalid Line Protocol: {}", e),
})?;
let lp_line_count = lines.len();
debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database");
//对数据进行保存
self.server
.write_lines(&db_name, &lines)
.await
.map_err(default_server_error_handler)?;
//返回成功
let lines_written = lp_line_count as u64;
Ok(Response::new(WriteResponse { lines_written }))
}
继续看self.server.write_lines
的执行:
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
self.require_id()?;
//验证一下名字,然后拿到之前创建数据库时候在内存中存储的相关信息
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
.config
.db(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?;
//这里就开始执行分片相关的策略
let (sharded_entries, shards) = {
//读取创建数据库时候配置的分片策略
let rules = db.rules.read();
let shard_config = &rules.shard_config;
//根据数据和shard策略,把逐个数据对应的分区找到
//写入到一个List<分区标识,List<数据>>这样的结构中
//具体的结构信息后面看
let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules)
.context(LineConversion)?;
//再把所有分区的配置返回给调用者
let shards = shard_config
.as_ref()
.map(|cfg| Arc::clone(&cfg.shards))
.unwrap_or_default();
(sharded_entries, shards)
};
//根据上面返回的集合进行map方法遍历,写到每个分区中
futures_util::future::try_join_all(
sharded_entries
.into_iter()
.map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
)
.await?;
Ok(())
}
这里描述了写入一条数据的主逻辑:数据写入的时候,先把数据划分到具体的分区里(使用List结构存储下所有的分区对应的数据),然后并行的进行数据写入
接下来看,数据是如何进行分区的:
pub fn lines_to_sharded_entries(
lines: &[ParsedLine<'_>],
sharder: Option<&impl Sharder>,
partitioner: &impl Partitioner,
) -> Result<Vec<ShardedEntry>> {
let default_time = Utc::now();
let mut sharded_lines = BTreeMap::new();
//对所有要插入的数据进行遍历
for line in lines {
//先找到符合哪个shard
let shard_id = match &sharder {
Some(s) => Some(s.shard(line).context(GeneratingShardId)?),
None => None,
};
//再判断属于哪个分区
let partition_key = partitioner
.partition_key(line, &default_time)
.context(GeneratingPartitionKey)?;
let table = line.series.measurement.as_str();
//最后存储到一个map中
//shard-> partition -> table -> List<data> 的映射关系
sharded_lines
.entry(shard_id)
.or_insert_with(BTreeMap::new)
.entry(partition_key)
.or_insert_with(BTreeMap::new)
.entry(table)
.or_insert_with(Vec::new)
.push(line);
}
let default_time = Utc::now();
//最后遍历这个map 转换到之前提到的List结构中
let sharded_entries = sharded_lines
.into_iter()
.map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time))
.collect::<Result<Vec<_>>>()?;
Ok(sharded_entries)
}
这里理解shard
的概念就是一个或者一组机器,称为一个shard
,他们负责真正的存储数据。
partition
理解为一个个文件夹,在shard
上具体的存储路径。
这里看一下是怎样完成shard
的划分的:
impl Sharder for ShardConfig {
fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> {
if let Some(specific_targets) = &self.specific_targets {
//如果对数据进行匹配,如果符合规则就返回,可以采用当前的shard
//官方的代码中只实现了根据表名进行shard的策略
//这个配置似乎只能通过grpc来进行设置,这样好处可能是将来有个什么管理界面能动态修改
if specific_targets.matcher.match_line(line) {
return Ok(specific_targets.shard);
}
}
//如果没有配置就使用hash的方式
//对整条数据进行hash,然后比较机器的hash,找到合适的节点
//如果没找到,就放在hashring的第一个节点
//hash算法见后面
if let Some(hash_ring) = &self.hash_ring {
return hash_ring
.shards
.find(LineHasher { line, hash_ring })
.context(NoShardsDefined);
}
NoShardingRuleMatches {
line: line.to_string(),
}
.fail()
}
}
//具体的Hash算法,如果全配置的话分的就会特别散,几乎不同测点都放到了不同的地方
impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> {
fn hash<H: Hasher>(&self, state: &mut H) {
//如果配置了使用table名字就在hash中加入tablename
if self.hash_ring.table_name {
self.line.series.measurement.hash(state);
}
//然后按照配置的列的值进行hash
for column in &self.hash_ring.columns {
if let Some(tag_value) = self.line.tag_value(column) {
tag_value.hash(state);
} else if let Some(field_value) = self.line.field_value(column) {
field_value.to_string().hash(state);t
}
state.write_u8(0); // column separator
}
}
}
接下来看默认的partition
分区方式:
impl Partitioner for PartitionTemplate {
fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> {
let parts: Vec<_> = self
.parts
.iter()
//匹配分区策略,或者是单一的,或者是复合的
//目前支持基于表、值、时间
//其余还会支持正则表达式和strftime模式
.map(|p| match p {
TemplatePart::Table => line.series.measurement.to_string(),
TemplatePart::Column(column) => match line.tag_value(&column) {
Some(v) => format!("{}_{}", column, v),
None => match line.field_value(&column) {
Some(v) => format!("{}_{}", column, v),
None => "".to_string(),
},
},
TemplatePart::TimeFormat(format) => match line.timestamp {
Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(),
None => default_time.format(&format).to_string(),
},
_ => unimplemented!(),
})
.collect();
//最后返回一个组合文件名,或者是 a-b-c 或者是一个单一的值
Ok(parts.join("-"))
}
}
到这里分区的工作就完成了,下一篇继续分析是怎样写入的。
祝玩儿的开心
欢迎关注微信公众号:
或添加微信好友: liutaohua001
Recommend
-
10
为什...
-
8
-
9
-
7
时序数据库Influx-IOx源码学习四(Run命令的执行) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 ...
-
9
时序数据库Influx-IOx源码学习五(创建数据库) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 仅限深圳|现场揭秘:腾讯云原生数据库...
-
10
时序数据库Influx-IOx源码学习六-2(数据写入) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 仅限深圳|现场揭秘:腾讯云原生数据库...
-
14
时序数据库Influx-IOx源码学习七(Chunk的生命周期) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 仅限深圳|现场揭秘:腾讯云原生...
-
5
时序数据库Influx-IOx源码学习八(Chunk持久化) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 仅限深圳|现场揭秘:腾讯云原生数据...
-
9
时序数据库Influx-IOx源码学习十(查询主流程) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 用代码...
-
8
时序数据库Influx-IOx源码学习十一(SQL的解析) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 用代...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK