26

Phoenix实践 | Phoenix映射HBase时间戳的实现

 4 years ago
source link: https://www.tuicool.com/articles/JFBrMnY
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Phoenix映射HBase时间戳的一种实现

官方实现

Apache Phoenix从4.6版本开始,提供了ROW_TIMESTAMP标签,来映射HBase的原生时间戳。但使用起来有以下限制:

  • 只有主键中的TIME, DATE, TIMESTAMP, BIGINT, UNSIGNED_LONG类型的字段才能设置成ROW_TIMESTAMP

  • 只能有一个主键列能被设置成ROW_TIMESTAMP

  • ROW_TIMESTAMP标志的字段不能为null值

  • 只有在建表的时候,某一列才能被设置成ROW_TIMESTAMP

  • ROW_TIMESTAMP标志的列不能为负数

除了上面使用上的限制,还有应用场景的限制。根据上面的描述,ROW_TIMESTAMP字段有以下几种形式。

  • 业务主键在前

    7BbuimQ.png!web

  • ROW_TIMESTAMP字段在前

    Mvyai2r.png!web

  • 只有ROW_TIMESTAMP字段

    yMjauuA.png!web

我们来看下各个形式的优劣

  • 业务主键在前。无论ROW_TIMESTAMP字段如何取值,都可以通过业务主键1进行单点查询,即在知道业务主键1的情况下是可以通过前缀精确快速的查询的。

  • ROW_TIMESTAMP字段在前。如果不知道某条数据对应的ROW_TIMESTAMP字段值,则无法通过主键查询;如果通过业务主键可以映射ROW_TIMESTAMP字段值,虽然可以通过主键查询,但该字段将无法修改。因为修改就意味着当前记录删除,重新插入。

  • 只有ROW_TIMESTAMP字段。在一些时序数据比较常见,也就是没有业务主键,不会也不便通过主键查询,一般都是范围扫描。

其实官方提供的ROW_TIMESTAMP字段实现,最大的问题就是原有记录不能更新,只能删除、然后插入,这就极大的限制了它的应用场景。

我们的实现

背景

我们用Phoenix存储了所有需要实时查询的表,写Phoenix-SQL查询当前最新的数据。基本架构如下:

VjIfqiE.png!web

问题

正常情况下,实时抽取MySQL的binlog,写入Phoenix;每天会有Hive批量抽取MySQL数据,对Phoenix进行校验、补数。
实时写入时,需要考虑binlog更新的顺序,至少要做到MySQL原数据每行更新的顺序;离线补数时,需要考虑是否会覆盖实时写入的数据。

实时写入

实时写入的顺序,大都由CDC(canal、debezium等)控制。针对每一条数据的更新,CDC都会对“表名+主键”进行Hash,路由到Kafka对应的分区。其实针对某个表某条记录的更新,消费时是有严格的顺序的。但如果后期更改kafka分区个数,就会稍微麻烦点。如果不停服更新,就意味着同一条记录的不同更新,分布在不同的分区,也就不能保证严格的顺序,插入Phoenix表就会出现覆盖的问题。如果停服更新,就需要先停掉CDC,等消费者把数据消费完,然后再调整分区,启动消费者,这样才能避免相互覆盖的问题。

实时写入还有一个潜在的问题,那就是数据丢失。不管是网络抖动,还是组件的健壮性,都会造成数据丢失。一旦发生数据丢失,就需要校验、补数的逻辑。

离线补数

离线补数就是为了防止出现实时数据丢失的问题。离线补数包含校验和补数两个步骤。

  • 校验。拿当前全量或增量数据,与Phoenix表中相同主键的数据进行比对,确定Phoenix是否丢数或丢失更新。

  • 丢数就是Phoenix应该有的数据却没有

  • 丢失更新就是Phoenix的数据不是最新的

  • 补数。根据上一步骤计算的丢失的数据或更新,写入Phoenix

离线补数看似完美,但最大的问题就是,校验和补数是两个步骤,也就是说不在一个事务里面。有可能某条数据在校验阶段,的确是丢失的,但在校验之后、补数之前,该条数据又被写到Phoenix表了,那么在补数之后,该数据又被更新成旧数据了。

解决方案

细心的读者会发现,使用官方提供的ROW_TIMESTAMP是无法很好的解决数据乱序覆盖的问题的。
那么,究竟该怎么办呢?有没有一种方案能完美的解决上面的问题呢?下面是我解决这个问题的思路和具体实现。

思路

熟悉HBase的读者一定知道,HBase插入或更新数据的时候是可以指定时间戳(版本号)的,而且HBase查询时默认显示时间戳最大的数据。那如果Phoenix在根据主键写入数据时,能把该条数据的更新时间写入HBase的时间戳字段,是不是就能解决相互覆盖的问题了呢?

的确能。

其实每一条更新都是数据的一个版本。如果写入时能指定时间戳,就意味着指定了数据

的版本,无论每个更新到达的顺序是怎样的,Phoenix读取时都会读取最新的数据。

如果能实现,那么Kafka重新设定分区个数和离线补数将不再需要考虑覆盖的问题。

但Phoenix目前并没有实现上面逻辑的机制,我们需要对其进行简单的升级。

实现方案

其实在Phoenix官方实现中,有一个CurrentSCN属性,它可以控制每一次DDL、DML、QUERY的时间戳的,也就是在插入或更新时,会根据CurrentSCN的值设定当前数据对应的HBase的时间戳。但很不幸,它只能控制每一次commit的数据,也就是无法精确控制每一条数据的时间戳。当然了,如果每一条数据Upsert时都设置CurrentSCN,然后commit也是可以解决问题的,但这就无法进行批量提交,会一定程度的影响性能。

其实我在实现时也是参考了CurrentSCN属性的原理。
经过分析,我找到了MutationState类的generateMutations方法的下面一段代码。

PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);

上面的代码其实是创建了一条数据,后续的upset的数据就是由此而来。根据timestampToUse命名可以猜想,它就是该条数据的时间戳。

/**
  * Creates a new row at the specified timestamp using the key
  * for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
  * and the optional key values specified using values.
  * @param ts the timestamp that the key value will have when committed
  * @param key the row key of the key value
  * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
  * @param values the optional key values
  * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
  * generate the Row to send to the HBase server.
  * @throws ConstraintViolationException if row data violates schema
  * constraint
  */
PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);

由newRow的描述可以确定我们的猜想,timestampToUse就是当前数据的时间戳。

根据调用链,我们找到了timestampToUse赋值最近的地方:UpsertCompiler.setValues方法,里面有一个RowTimestampColInfo类型的rowTsColInfo字段。其实还是找到timestampToUse最初的地方,也就是获取CurrentSCN的代码段,但考虑到不对原有的CurrentSCN功能过多干涉,我们选择优化UpsertCompiler.setValues方法。下面是改造后的代码片段:

for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
            byte[] value = values[j];
            PColumn column = table.getColumns().get(columnIndexes[i]);
            if (SchemaUtil.isPKColumn(column)) {
                pkValues[pkSlotIndex[i]] = value;
                if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
                    if (!useServerTimestamp) {
                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
                        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
                        if (rowTimestamp < 0) {
                            throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
                        }
                        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
                    } 
                }
            } else {
                columnValues.put(column, value);
                columnValueSize += (column.getEstimatedSize() + value.length);
            }
            if(column.getDataType().getSqlTypeName().equals(PRowts.INSTANCE.getSqlTypeName()) && rowTimestamp == null){
                rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, column.getSortOrder());
                if (rowTimestamp < 0) {
                    throw new IllegalDataException("Value of a column designated as ROW_TS cannot be less than zero");
                }
                rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
            }
        }

在处理每行数据每个字段值的时候,判断当前字段类型是否为PRowts类型,如果是,则根据该值创建RowTimestampColInfo。这样就达到了根据数据改变HBase时间戳的目的。

考虑到快速、简单的实现PRowts类型,我们选择将PRowts设定为Long类型的别名,其实就是根据PLong类创建PRowts,二者的逻辑完全一致。只不过个别参数名称不同。
下面是PRowts的默认构造函数。

private PRowts() {
    super("ROW_TS", 21, Long.class, new PLong.LongCodec(), 48);
}

至此,我们就实现将数据的时间戳映射到HBase的时间戳的功能。简单来说分为两步:

  1. 新增PRowts类型。创建表时,指定某个字段为PRowts,该字段原始类型必须是long;或者修改字段的类型为PRowts。

  2. 根据数据构造HBase的Put命令时,将PRowts的值写入row timestamp

当然,限于篇幅还是有很多细节没有解释的,而且也不一定选择改造UpsertCompiler.setValues,读者可以根据实际情况自行实现。另外也可以扩展PRowts,使其支持其他时间类型的数据,比如TIME、DATE、TIMESTAMP、BIGINT。

“HelloGrape”是我的微信二维码,欢迎大家讨论技术,共同提高。

这是我大表哥,多年基础架构工作经验,深入掌握多种中间件,也做过大数据相关工作。人狠话不多,几乎每篇文章都是精品。

JvA73qf.jpg!web

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛http://hbase.group,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。

3INbInB.jpg!web

本群为HBase+Spark技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动

点击链接钉钉入群:https://dwz.cn/Fvqv066s或扫码进群

7f6n6re.jpg!web

本群为Cassandra技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动

Cassandra 社区钉钉大群:https://c.tb.cn/F3.ZRTY0o

em63Mzv.jpg!web

Cassandra 技术社区微信公众号:

QBbaEfF.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK