10

Apache Hudi索引实现分析(三)之HBaseIndex

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzIyMzQ0NjA0MQ%3D%3D&%3Bmid=2247484592&%3Bidx=1&%3Bsn=fecec276c9477a8f365f54649fe26a69
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1. 介绍

前面分析了基于过滤器的索引,接着分析基于外部存储系统的索引实现:HBaseIndex。对于想自定义实现Index具有一定的借鉴作用。

2. 分析

HBaseIndex也是HoodieIndex的子类实现,其实现了父类的两个核心方法。

// 给输入记录RDD打位置标签 
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable);
// 更新位置信息    
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable);

在写入数据过程中,会调用 tagLocation 给输入记录打位置标签,其核心代码如下

  public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
      HoodieTable<T> hoodieTable) {  
    return recordRDD.mapPartitionsWithIndex(locationTagFunction(hoodieTable.getMetaClient()), true);
  }

可以看到该方法主要使用了 locationTagFunction Function来处理原始记录,其核心代码如下

  private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> locationTagFunction(
      HoodieTableMetaClient metaClient) {

    return (Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>) (partitionNum,
        hoodieRecordIterator) -> {
      // 每次取的批次大小   
      int multiGetBatchSize = config.getHbaseIndexGetBatchSize();

      // 获取HBase连接
      synchronized (HBaseIndex.class) {
        if (hbaseConnection == null || hbaseConnection.isClosed()) {
          hbaseConnection = getHBaseConnection();
        }
      }
      List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
      HTable hTable = null;
      try {
        // 获取配置的表  
        hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
        List<Get> statements = new ArrayList<>();
        List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
        // 遍历该分区上的记录
        while (hoodieRecordIterator.hasNext()) {
          HoodieRecord rec = hoodieRecordIterator.next();
          // 根据recordKey生成Get  
          statements.add(generateStatement(rec.getRecordKey()));
          currentBatchOfRecords.add(rec);
          // 达到批量大小或者遍历完记录
          if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
            // 获取结果
            Result[] results = doGet(hTable, statements);
            // 清空便于GC回收
            statements.clear();
            for (Result result : results) {
              // 移除结果对应的的HoodieRecord
              HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
              if (result.getRow() != null) {
                // 取出key, commit时间,文件ID和分区路径  
                String keyFromResult = Bytes.toString(result.getRow());
                String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
                String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
                String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
                // 检查是否为合法的提交(包含在timeline或者小于最新的一次commit)
                if (checkIfValidCommit(metaClient, commitTs)) {
                  // 重新生成HoodieRecord  
                  currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
                      currentRecord.getData());
                  currentRecord.unseal();
                  // 设置位置信息  
                  currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
                  currentRecord.seal();
                  taggedRecords.add(currentRecord);
                  // the key from Result and the key being processed should be same
                  assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
                } else { // 非法提交,也标记为已打完标签
                  taggedRecords.add(currentRecord);
                }
              } else { // 标记为已打完标签
                taggedRecords.add(currentRecord);
              }
            }
          }
        }
      }
      return taggedRecords.iterator();
    };
  }

可以看到从HBase中取位置信息流程非常简单,即遍历指定分区上所有记录,然后批量生成recordKey从HBase索引表(表名自定义配置)取对应的信息,然后生成位置信息。

当写完数据后,需要调用 updateLocation 更新记录的位置信息,其核心代码如下

  public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
      HoodieTable<T> hoodieTable) {
    // 根据配置(hoodie.index.hbase.qps.allocator.class)生成Allocator  
    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
    // 根据Allocator进行初始化  
    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
    // 使用Function处理  
    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
    // 缓存状态RDD
    writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel());
    return writeStatusJavaRDD;
  }

其中 updateLocationFunction 核心代码如下

  private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateLocationFunction() {

    return (Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>>) (partition, statusIterator) -> {

      List<WriteStatus> writeStatusList = new ArrayList<>();
      // 获取HBase连接
      synchronized (HBaseIndex.class) {
        if (hbaseConnection == null || hbaseConnection.isClosed()) {
          hbaseConnection = getHBaseConnection();
        }
      }
      try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
        // 遍历状态信息  
        while (statusIterator.hasNext()) {
          WriteStatus writeStatus = statusIterator.next();
          List<Mutation> mutations = new ArrayList<>();
          try {
            for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
              if (!writeStatus.isErrored(rec.getKey())) {
                // 获取新的位置信息  
                Option<HoodieRecordLocation> loc = rec.getNewLocation();
                if (loc.isPresent()) { // 新的位置信息存在
                  if (rec.getCurrentLocation() != null) { // 当前位置信息存在
                    // 表示更新,无需更新
                    continue;
                  }
                  // 根据HoodieRecord信息初始化Put  
                  Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
                  put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime()));
                  put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId()));
                  put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath()));
                  mutations.add(put);
                } else { // 新的位置不存在
                  // 表示删除了该记录
                  Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey()));
                  mutations.add(delete);
                }
              }
              if (mutations.size() < multiPutBatchSize) {
                continue;
              }
              // 更新  
              doMutations(mutator, mutations);
            }
            // 处理剩余的更新
            doMutations(mutator, mutations);
          } 
          writeStatusList.add(writeStatus);
        }
      } 
      return writeStatusList.iterator();
    };
  }

可以看到当写完数据后,会更新位置信息,通过WriteStatus中的HoodieRecord的位置信息判断是否需要更新位置信息,对于更新无需要更新,对于新插入需要更新,对于删除需要删除HBase中存储的信息。

3. 总结

Hudi内置了HBase外置存储系统索引的实现,用户可直接配置HBase索引,将记录索引信息存入HBase,当然用户也可自定义实现其他类型索引。

eiuIjey.png!web

BFFbMbf.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK