2

Apache IoTDB C# SDK 介绍 - 张善友

 1 year ago
source link: https://www.cnblogs.com/shanyou/p/17064424.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Apache IoTDB C# SDK 介绍

TsFile 是 IoTDB 的底层数据文件,一种专门为时间序列数据设计的列式文件格式。IoTDB TsFile数据读写主要是下面两个结构:

  • IoTDB 提供了一个TSRecord工具,TSRecord记录了一个设备在一个时间戳下的若干测点信息。在c# 客户端里被抽象成了Row Record
  • IoTDB 提供了一个Tablet工具,Tablet记录了一个设备的多个测点的信息,按照一种表格的形式表示,这些测点具有相同的时间戳序列,因此可以应用在测点具有相同时间戳序列(每个时间戳下各个测点都具有值)的设备中。

IoTDB C# SDK  叫做 Apache-IoTDB-Client-CSharp,Github:https://github.com/eedalong/Apache-IoTDB-Client-CSharp ,Nuget 包有两个:

Apache.IoTDB和 Apache.IoTDB.Data。 其中 Apache.IoTDB.Data 是对ADO .NET支持,以.NET 读取数据库的方式方便不同使用习惯的用户, C#客户端也及时更新支持最新的Apache IoTDB的特性,如对齐序列插入、SchemaTemplate操纵接口的 支持、支持插入空值的Tablet结构等。

最近刚刚发布了对IoTDB 1.0版本的支持的1.0.0.1预览版已经发布,欢迎各位试用并提issue~: https://www.nuget.org/packages/Apache.IoTDB/1.0.0.1-alpha

使用示例

// 参数定义
string host = "localhost";
int port = 6667;
int pool_size = 2;

// 初始化session
var session_pool = new SessionPool(host, port, pool_size);

// 开启session
await session_pool.Open(false);

// 创建时间序列
await session_pool.CreateTimeSeries("root.test_group.test_device.ts1", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.UNCOMPRESSED);
await session_pool.CreateTimeSeries("root.test_group.test_device.ts2", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.UNCOMPRESSED);
await session_pool.CreateTimeSeries("root.test_group.test_device.ts3", TSDataType.INT32, TSEncoding.PLAIN, Compressor.UNCOMPRESSED);

// 插入record
var measures = new List<string>{"ts1", "ts2", "ts3"};
var values = new List<object> { "test_text", true, (int)123 };
var timestamp = 1;
var rowRecord = new RowRecord(timestamp, values, measures);
await session_pool.InsertRecordAsync("root.test_group.test_device", rowRecord);

// 插入Tablet
var timestamp_lst = new List<long>{ timestamp + 1 };
var value_lst = new List<object> {new() {"iotdb", true, (int) 12}};
var tablet = new Tablet("root.test_group.test_device", measures, value_lst, timestamp_ls);
await session_pool.InsertTabletAsync(tablet);

// 关闭Session
await session_pool.Close();

详细接口信息可以参考接口文档

C#客户端暴露的所有接口均为异步接口。使用C#客户端从首先建立一个SessionPool开始,建立SessionPool时需要指定服务器的IP 、Port 以及 SessionPool的大小,SessionPool的大小代表本地与服务器建立的连接的数目。为了实现并发客户端请求,客户端提供了针对原生接口的连接池(SessionPool),由于SessionPool本身为Session的超集,当SessionPoolpool_size参数设置为1时,退化为原来的Session

客户端 使用ConcurrentQueue数据结构封装了一个客户端队列,以维护与服务端的多个连接,当调用Open()接口时,会在该队列中创建指定个数的客户端,同时通过System.Threading.Monitor类实现对队列的同步访问。

当请求发生时,会尝试从连接池中寻找一个空闲的客户端连接,如果没有空闲连接,那么程序将需要等待直到有空闲连接

当一个连接被用完后,他会自动返回池中等待下次被使用

在使用连接池后,客户端的并发性能提升明显,这篇文档展示了使用线程池比起单线程所带来的性能提升

ByteBuffer

在传入RPC接口参数时,需要对Record和Tablet两种数据结构进行序列化,我们主要通过封装的ByteBuffer类实现

在封装字节序列的基础上,我们进行了内存预申请与内存倍增的优化,减少了序列化过程中内存的申请和释放,在一个拥有20000行的Tablet上进行序列化测试时,速度比起原生的数组动态增长具有35倍的性能加速,详见以下两篇文档:

在库里 有一个 IoTDB C#客⼾端性能分析报告:https://github.com/eedalong/Apache-IoTDB-Client-CSharp/blob/main/docs/time_profile_zh.pdf ,建议大家看一看,这里只说结论:

  • 在插⼊与该⽤⼾类似的结构化较强、没有空值、规整、每⾏的column固定的的数据时,建议使⽤insert_tablet接⼝,经过改善后的insert_tablet接⼝具备较好的性能,能满⾜该⽤⼾的需求
  • 数据量较⼤,但数据整体不规整或者有空值,每⾏数据的column数不定时建议使⽤insert_records接⼝,该接⼝对record数据的插⼊速度较为可观
  • 数据量⼩,需要对原有数据做出⼀定的修正 时,使⽤insert_record接⼝

参考文章:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK