39

Flink去重第四弹:bitmap精确去重

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA%3D%3D&%3Bmid=2247484148&%3Bidx=1&%3Bsn=6febe29d0a0222fa0f2dd4484f82959f
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

VJrYVzM.gif

点击箭头处 “蓝色字” ,关注我们哦!!

在前面提到的精确去重方案都是会保存全量的数据,但是这种方式是以牺牲存储为代价的,而hyperloglog方式虽然减少了存储但是损失了精度,那么如何能够做到精确去重又能不消耗太多的存储呢,这篇主要讲解如何使用bitmap做精确去重。

ID-mapping

在使用bitmap去重需要将去重的id转换为一串数字,但是我们去重的通常是一串包含字符的字符串例如设备ID,那么第一步需要将字符串转换为数字,首先可能想到对字符串做hash,但是hash是会存在概率冲突的,那么可以使用美团开源的leaf分布式唯一自增ID算法,也可以使用Twitter开源的snowflake分布式唯一ID雪花算法,我们选择了实现相对较为方便的 snowflake 算法( 从网上找的 ),代码如下:

public class SnowFlake {


/**

* 起始的时间戳

*/

private final static long START_STMP = 1480166465631L;


/**

* 每一部分占用的位数

*/

private final static long SEQUENCE_BIT = 12; //序列号占用的位数


private final static long MACHINE_BIT = 5; //机器标识占用的位数


private final static long DATACENTER_BIT = 5;//数据中心占用的位数


/**

* 每一部分的最大值

*/

private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);


private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);


private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);


/**

* 每一部分向左的位移

*/

private final static long MACHINE_LEFT = SEQUENCE_BIT;


private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;


private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;


private long datacenterId; //数据中心


private long machineId; //机器标识


private long sequence = 0L; //序列号


private long lastStmp = -1L;//上一次时间戳


public SnowFlake(long datacenterId, long machineId) {

if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {

throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");

}

if (machineId > MAX_MACHINE_NUM || machineId < 0) {

throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");

}

this.datacenterId = datacenterId;

this.machineId = machineId;

}


/**

* 产生下一个ID

*

* @return

*/

public synchronized long nextId() {

long currStmp = getNewstmp();

if (currStmp < lastStmp) {

throw new RuntimeException("Clock moved backwards. Refusing to generate id");

}


if (currStmp == lastStmp) {

//相同毫秒内,序列号自增

sequence = (sequence + 1) & MAX_SEQUENCE;

//同一毫秒的序列数已经达到最大

if (sequence == 0L) {

currStmp = getNextMill();

}

} else {

//不同毫秒内,序列号置为0

sequence = 0L;

}


lastStmp = currStmp;


return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分

| datacenterId << DATACENTER_LEFT //数据中心部分

| machineId << MACHINE_LEFT //机器标识部分

| sequence; //序列号部分

}


private long getNextMill() {

long mill = getNewstmp();

while (mill <= lastStmp) {

mill = getNewstmp();

}

return mill;

}


private long getNewstmp() {

return System.currentTimeMillis();

}



}

snowflake算法的实现是与机器码以及时间有关的,为了保证其高可用做了两个机器码不同的对外提供的服务。 那么整个转换流程如下图:

JN7jmam.png!web

首先会从Hbase中查询是否有UID对应的ID,如果有则直接获取,如果没有则会调用ID-Mapping服务,然后将其对应关系存储到Hbase中,最后返回ID至下游处理。

UDF化

为了方便提供业务方使用,同样需要将其封装成为UDF, 由于snowflake算法得到的是一个长整型,因此选择了Roaring64NavgabelMap作为存储对象,由于去重是按照维度来计算,所以使用UDAF,首先定义一个accumulator:

public class PreciseAccumulator{


private Roaring64NavigableMap bitmap;


public PreciseAccumulator(){

bitmap=new Roaring64NavigableMap();

}


public void add(long id){

bitmap.addLong(id);

}


public long getCardinality(){

return bitmap.getLongCardinality();

}

}

udfa实现

public class PreciseDistinct extends AggregateFunction<Long, PreciseAccumulator> {


@Override public PreciseAccumulator createAccumulator() {

return new PreciseAccumulator();

}


public void accumulate(PreciseAccumulator accumulator,long id){

accumulator.add(id);

}


@Override public Long getValue(PreciseAccumulator accumulator) {

return accumulator.getCardinality();

}

}

那么在实际使用中只需要注册udaf即可。

关于去重系列就写到这里,如果您有不同的意见或者看法,欢迎私信。

—END—

36VN3az.jpg!web

关注回复 Flink

获取更多系列

原创不易,好看,就点个"在看"


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK