

时序数据库Influx-IOx源码学习七(Chunk的生命周期)
source link: https://my.oschina.net/u/3374539/blog/5029926
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。
InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上一章介绍了数据从客户端写入到服务器端的内存中的整个过程。详情见: https://my.oschina.net/u/3374539/blog/5027429
这一章记录一下数据库中数据管理单元Chunk的生命周期。
在开篇,先介绍一下一个Chunk
拥有的生命周期:
//这里需要注意,这些变体里的Chunk结构都是不相同的
//也就是有内存数据拷贝的工作
pub enum ChunkState {
//内部移动数据时候用的
Invalid,
//可以写入
Open(MBChunk),
//还能继续写入,但很快会被关闭
Closing(MBChunk),
//已经不能写入了,准备移动到readbuffer
Moving(Arc<MBChunk>),
//已经被移动到了read buffer
Moved(Arc<ReadBufferChunk>),
//准备写入持久化存储
WritingToObjectStore(Arc<ReadBufferChunk>),
//写入持久化存储完成
WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>),
}
在第五章中有提到,在Create Database之后,会启动一个后台线程。
该后台线程完成了部分对Chunk
的管理功能,通过理解这个后台线程,能够基本理解Chunk
的所有生命周期。
//后台线程的方法入口,在创建完成数据库后,就会调用到这个方法
pub async fn background_worker(
self: &Arc<Self>,
shutdown: tokio_util::sync::CancellationToken,
) {
//创建一个定时器,周期性的执行
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self));
//没有收到停止服务器时候的信号就一直执行,1秒一次
while !shutdown.is_cancelled() {
//记录执行的次数,每次加1,Ordering::Relaxed代表的单线程里的原子操作
self.worker_iterations.fetch_add(1, Ordering::Relaxed);
//进入生命周期的管理
lifecycle_manager.check_for_work();
//收到不同信号之后的处理方法
tokio::select! {
_ = interval.tick() => {},
_ = shutdown.cancelled() => break
}
}
info!("finished background worker");
}
前方高能,请注意:
fn check_for_work(&mut self, now: DateTime<Utc>) {
//获取创建数据库的时候,对于Chunk的相关配置
let rules = self.rules();
//根据配置的排序规则,获取出内存里所有的chunk
let chunks = self.chunks(&rules.sort_order);
let mut buffer_size = 0;
//判断是不是有其他的任务正在执行,move我理解针对于read buffer,write对于持久化
let mut move_active = self.is_move_active();
let mut write_active = self.is_write_active();
//遍历所有块,检查哪些块可以被持久化
for chunk in &chunks {
//获取当前chunk的锁
let chunk_guard = chunk.upgradable_read();
//获取chunk占用的内存大小
buffer_size += Self::chunk_size(&*chunk_guard);
//没有移动任务并且Chunk里最后的写入时间比较老
let would_move = !move_active && can_move(&rules, &*chunk_guard, now);
//没有写出任务,并且开启了持久化
let would_write = !write_active && rules.persist;
//判断chunk的生命周期
match chunk_guard.state() {
//属于open状态,并且是需要移动的(上面的逻辑里有展示什么是需要移动的)
//这里我理解就是相当于实时写入时候的一个补充方案
//试想,如果一个chunk一直不写入数据,可能有一年了,查询都不再用这些数据了,内存却被一直占用
ChunkState::Open(_) if would_move => {
let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard);
//切换状态到closing
chunk_guard.set_closing().expect("cannot close open chunk");
let partition_key = chunk_guard.key().to_string();
let chunk_id = chunk_guard.id();
std::mem::drop(chunk_guard);
move_active = true;
//移动到read_buffer,变为不可写入状态(启动了一个异步的线程,后面看)
self.move_to_read_buffer(partition_key, chunk_id);
}
//这里有几种情况,同样会在别处触发为closing
//例如:chunk大小超过了设置的可变内存大小的时候
ChunkState::Closing(_) if would_move => {
let partition_key = chunk_guard.key().to_string();
let chunk_id = chunk_guard.id();
std::mem::drop(chunk_guard);
move_active = true;
//移动到read_buffer
self.move_to_read_buffer(partition_key, chunk_id);
}
//已经被挪动到readbuffer中的
ChunkState::Moved(_) if would_write => {
let partition_key = chunk_guard.key().to_string();
let chunk_id = chunk_guard.id();
std::mem::drop(chunk_guard);
write_active = true;
//写入到对象存储
self.write_to_object_store(partition_key, chunk_id);
}
_ => {}
}
}
//这里是主要检查内存限制的逻辑,当所有chunk的大小超过限制的时候就要清理Chunk
if let Some(soft_limit) = rules.buffer_size_soft {
let mut chunks = chunks.iter();
while buffer_size > soft_limit.get() {
match chunks.next() {
Some(chunk) => {
//获取读锁
let chunk_guard = chunk.read();
//如果配置了可以清理未持久化数据,那么处在read_buffer里的数据也会被清理
//一定会清理已经被持久化到对象存储上的数据
if (rules.drop_non_persisted
&& matches!(chunk_guard.state(), ChunkState::Moved(_)))
|| matches!(chunk_guard.state(), ChunkState::WrittenToObjectStore(_, _))
{
let partition_key = chunk_guard.key().to_string();
let chunk_id = chunk_guard.id();
buffer_size =
buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard));
std::mem::drop(chunk_guard);
//真真正正的删除逻辑后面看
self.drop_chunk(partition_key, chunk_id)
}
}
//没有什么可以释放的了
None => {
warn!(db_name=self.db_name(), soft_limit, buffer_size,
"soft limited exceeded, but no chunks found that can be evicted. Check lifecycle rules");
break;
}
}
}
}
}
这里基本看清楚了Chunk
的周期:
- 在写入时候,如果没有
Chunk
就会open
一个,并处在open
状态。 - 如果写入超过了一些限制,就会被标记为
closing
;如果数据时间超过了配置的时间,也会被标记为closing
。标记为closing
的会添加一个后台进程,准备将Chunk
移动到read_buffer
中。 - 后台任务启动后,会标记为
moving
状态,此时禁止Chunk
再写入任何数据。 - 一旦移动完成,会被标记为
moved
。 - 程序会对
moved
状态下的Chunk
开始进行持久化。 - 扫描任务会不断判断内存使用是否超过了限制,如果超过限制,会清理已经持久化的
Chunk
。如果配置了drop_non_persisted
,会把read_buffer
中未持久化的也删除掉。
然后继续看程序是怎样将一个chunk
移动到read_buffer
的,因为篇幅的影响,将会在下一篇介绍数据是怎样真正写入到持久化存储当中的。
pub async fn load_chunk_to_read_buffer(
&self,
partition_key: &str,
chunk_id: u32,
) -> Result<Arc<DbChunk>> {
//根据partition_key及chunk_id获取内存中存储的Chunk
let chunk = {
let partition = self
.catalog
.valid_partition(partition_key)
.context(LoadingChunk {
partition_key,
chunk_id,
})?;
let partition = partition.read();
partition.chunk(chunk_id).context(LoadingChunk {
partition_key,
chunk_id,
})?
};
//设置当前的Chunk为Moving状态
let mb_chunk = {
let mut chunk = chunk.write();
chunk.set_moving().context(LoadingChunk {
partition_key,
chunk_id,
})?
};
info!(%partition_key, %chunk_id, "chunk marked MOVING, loading tables into read buffer");
let mut batches = Vec::new();
//这里是拿到Chunk中每个Cloumn的统计信息,分别是min,max,count
let table_stats = mb_chunk.table_summaries();
//从新创建一个ReadBufferChunk,后面准备把所有数据都拷贝到这里
//还需要告诉内存管理这里新申请了多少空间
let rb_chunk =
ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer);
for stats in table_stats {
//把内存中的数据,全部重新拷贝一次,转换为arrow格式
mb_chunk
.table_to_arrow(&mut batches, &stats.name, Selection::All)
//这里应该是还没有写完,如果出现错误,这个Chunk该怎么处理?
.expect("Loading chunk to mutable buffer");
//循环拷贝
for batch in batches.drain(..) {
rb_chunk.upsert_table(&stats.name, batch)
}
}
let mut chunk = chunk.write();
//更新写入缓存里的Chunk为Moved状态,同时Chunk内容修改为了ReadBuffer的Chunk
//对于Chunk的结构后面看
chunk.set_moved(Arc::new(rb_chunk)).context(LoadingChunk {
partition_key,
chunk_id,
})?;
//工作全部都完成了,调用做快照的方法,方法里什么都没做,返回新Chunk的一个Arc指针
Ok(DbChunk::snapshot(&chunk))
}
到这里基本清楚了整个Chunk
的工作方式,因为Chunk
这个名字被代码中重复使用到了,所以特意在文章末尾说一下都有什么Chunk
。
//主要是存储一个数据块的描述信息,名字、最后写入时间等
Server::db::catalog::chunk
//数据从客户端直接写入的内存块
mutable_buffer::chunk
//在moving时候拷贝的新数据块,arrow结构
read_buffer::chunk
//parquet对应的chunk
parquet_file::chunk
//query模块下对PartitionChunk重新命名了一下
//对于相同的partition key的数据抽象的行为
query -> type Chunk: PartitionChunk;
//实现PartitionChunk定义的方法,对不同位置下的chunk的操作
//如ParquetFile、MutableBuffer等
server::db::chunk
好了就到这里,希望你也学到了很多
祝玩儿的开心
欢迎关注微信公众号:
或添加微信好友: liutaohua001
Recommend
-
10
为什...
-
8
-
9
-
7
时序数据库Influx-IOx源码学习四(Run命令的执行) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 ...
-
4
时序数据库Influx-IOx源码学习六-1(数据写入之分区) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 仅限深圳|现场揭秘:腾讯云原生...
-
9
时序数据库Influx-IOx源码学习五(创建数据库) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 仅限深圳|现场揭秘:腾讯云原生数据库...
-
10
时序数据库Influx-IOx源码学习六-2(数据写入) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 仅限深圳|现场揭秘:腾讯云原生数据库...
-
5
时序数据库Influx-IOx源码学习八(Chunk持久化) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 仅限深圳|现场揭秘:腾讯云原生数据...
-
9
时序数据库Influx-IOx源码学习十(查询主流程) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 用代码...
-
8
时序数据库Influx-IOx源码学习十一(SQL的解析) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区 用代...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK