8

时序数据库Influx-IOx源码学习六-2(数据写入)

 3 years ago
source link: https://my.oschina.net/u/3374539/blog/5027429
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
时序数据库Influx-IOx源码学习六-2(数据写入) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章说到数据写入时的分区机制及分区现有的功能。详情见: https://my.oschina.net/u/3374539/blog/5026139

这一章记录一下数据是怎样进行存储的。


上一章没有细节的介绍数据从Line protocol被解析成了什么样子,在开篇先介绍一下数据被封装后的展示。

转换过程的代码可以参见internal_types/src/entry.rs : 157行中的build_table_write_batch方法;

内部数据结构可以查看: generated_types/protos/influxdata/write/v1/entry.fbs

数据是被层层加码组装出来的:

LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry

sharded_entries:[{
  shard_id: None,
  entry: {
      fb: {
        operation_type: write,
        operation: {
          partition_writes:[{
            key:"2019-05-02 16:00:00",
            table_batches:[
            { 
              name:"myMeasurement",
              columns:[
              {
                name:"fieldKey",
                logical_column_type: Field,
                values_type: StringValues,
                values: { values:["123"] },
                null_mask: None 
              },
              {
                name:"tag1",
                logical_column_type: Tag,
                values_type: StringValues,
                values: { values:["value1"]) },
                null_mask: None 
              },
              {
                name:"tag2",
                logical_column_type: Tag,
                values_type: StringValues,
                values: { values:["value2"]) },
                null_mask: None
              }, 
              { 
                name:"time",
                logical_column_type: Time,
                values_type: I64Values,
                values: { values:[1556813561098000000]) },
                null_mask: None
              }]
            }]
          }]
      } 
    }
  }
}]

数据在内存中就会形成如上格式保存,但要注意,内存中使用的 flatbuffer 格式保存,上面只是为了展示内容

继续上节里的内容,结构被拼凑完成之后,就会调用write_sharded_entry方法去进行实际写入工作:

futures_util::future::try_join_all(
            sharded_entries
                .into_iter()
                 //对每个数据进行写入到shard
                .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
        )
        .await?;

然后看是怎样写入到shard的,因为shard的写入还没有完成,所以只能关注单机的写入了。具体看代码:

 async fn write_sharded_entry(
        &self,
        db_name: &str,
        db: &Db,
        shards: Arc<HashMap<u32, NodeGroup>>,
        sharded_entry: ShardedEntry,
    ) -> Result<()> {
        //判断shard的id是否为null,如果是null就写入本地
        //否则就写入到具体的shard去
        match sharded_entry.shard_id {
            Some(shard_id) => {
                let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
                //还没有真正的实现,可以看下面的方法
                self.write_entry_downstream(db_name, node_group, &sharded_entry.entry)
                    .await?
            }
            None => self.write_entry_local(&db, sharded_entry.entry).await?,
        }
        Ok(())
    }


    //可以看到还没有实现远程的写入
    async fn write_entry_downstream(
        &self,
        db_name: &str,
        node_group: &[WriterId],
        _entry: &Entry,
    ) -> Result<()> {
        todo!(
            "perform API call of sharded entry {} to one of the nodes {:?}",
            db_name,
            node_group
        )
    }

  //数据对本地写入
   pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> {
        //继续往下跟踪
        db.store_entry(entry).map_err(|e| match e {
            db::Error::HardLimitReached {} => Error::HardLimitReached {},
            _ => Error::UnknownDatabaseError {
                source: Box::new(e),
            },
        })?;

        Ok(())
    }

//方法似乎什么都没做,只是增补了clock_value和write_id
//注释上解释到logical clock是一个用来在数据库内部把entry变为有序的字段
pub fn store_entry(&self, entry: Entry) -> Result<()> {
        //生成一个新的结构SequencedEntry并增补字段
        let sequenced_entry = SequencedEntry::new_from_entry_bytes(
            ClockValue::new(self.next_sequence()),
            self.server_id.get(),
            entry.data(),
        ).context(SequencedEntryError)?;
        //关于读缓存相关的配置和实现,先不用管
        if self.rules.read().wal_buffer_config.is_some() {
            todo!("route to the Write Buffer. TODO: carols10cents #1157")
        }
        //继续调用其他方法
        self.store_sequenced_entry(sequenced_entry)
    }

上面的所有方法完成之后,基本的插入数据格式就准备完成了,接下来就是写入内存存储:

pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> {
        //读取出数据库对于写入相关的配置信息
        //包括是否可写、是否超过内存限制等等验证
        let rules = self.rules.read();
        let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold;
        if rules.lifecycle_rules.immutable {
            return DatabaseNotWriteable {}.fail();
        }
        if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard {
            if self.memory_registries.bytes() > hard_limit.get() {
                return HardLimitReached {}.fail();
            }
        }
        //rust语言中的释放变量
        std::mem::drop(rules);

        //因为是批量写入,所以需要循环
        //partition_writes的数据格式可以参见上面的json数据
        if let Some(partitioned_writes) = sequenced_entry.partition_writes() {
            for write in partitioned_writes {
                let partition_key = write.key();
                //根据之前生成的partition_key来得到或者创建一个partition描述
                let partition = self.catalog.get_or_create_partition(partition_key);
                //这里是拿到一个写锁
                let mut partition = partition.write();
                //更新这个partition最后的插入时间
                //记录这个的目的,代码上并没写明白是做什么用的
                partition.update_last_write_at();
                //找到一个打开的chunk
                //不知道为什么每次都要在所有chunk里搜索一次
                //难道是同时可能有很多个chunk都可以写入?
                let chunk = partition.open_chunk().unwrap_or_else(|| {
                   //否则就创建一个新的chunk出来
                   partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref())
                });
                //获取一个写锁
                let mut chunk = chunk.write();
                //更新当前chunk的第一条、最后一条写入记录
                chunk.record_write();
                //得到chunk的内存区域,称为mutable_buffer
                let chunk_id = chunk.id();
                let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk");
                //真正的写入到内存中
                mb_chunk
                    .write_table_batches(
                        sequenced_entry.clock_value(),
                        sequenced_entry.writer_id(),
                        &write.table_batches(),
                    )
                    .context(WriteEntry {
                        partition_key,
                        chunk_id,
                    })?;

                //如果当前chunk写入数据的大小超过了设置的限制,就关闭
                //关闭的意思就是把状态制为Closing,并更新关闭时间
                let size = mb_chunk.size();
                if let Some(threshold) = mutable_size_threshold {
                    if size > threshold.get() {
                        chunk.set_closing().expect("cannot close open chunk")
                    }
                }
            }
        }

        Ok(())
    }

再深入的就不继续跟踪了,但是思路还是比较清晰了。

1.分区相关

client --> grpc --> 进行分区shard --> 分区partition

2.写入相关

LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry

  • 内存写入空间

catalog -> partition -> table -> column

  • 达到指定大小后标记为关闭

  • 异步 - 后台线程进行内存整理


到这里基本就完成了所有的写入,并返回给客户端成功。

关于后台线程的内存整理再下一篇中继续介绍。

祝玩儿的开心


欢迎关注微信公众号:

或添加微信好友: liutaohua001


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK