声明:本系列博客为原创,最先发表在拉勾教育,其中一部分为免费阅读部分。被读者各种搬运至各大网站。所有其他的来源均为抄袭。
《2021年最新版大数据面试题全面开启更新》
重复数据消除是实际业务中经常出现的问题之一。 在大数据领域,重复数据删除有助于减少存储所需的存储容量。 此外,某些业务场景不允许重复数据,例如准确统计站点每天的用户数,或统计事实表中每天发送的快递包裹数据。 在传统的脱机计算中,可以直接在SQL中通过DISTINCT函数,或者在数据量持续增加时使用类似于MR的思想。 那么,实时计算在进行重新计数时需要增量且长期的过程,并且根据场景的不同,由于效率和精度的问题,计划也需要发生变化。
针对上述问题,Flink上常见的重复数据删除方案:
基于状态的后端是基于HyperLogLog的光晕过滤器(BloomFilter )的基于BitMap的外部数据库的基于状态的后端
Flink状态的后端类型之一是RocksDBStateBackend。 他将正在运行的状态数据保存到RocksDB数据库中。 缺省情况下,此数据库将数据保存在任务管理器运行节点的数据目录中。
RocksDB是K-V数据库,可以使用MapState来移除重量。 模拟场景,计算每个商品的SKU访问量。
总体代码如下:
publicclassmapstatedistinctfunctionextendskeyedprocessfunctionstring,Tuple2String,Integer,Tuple2String, integer { privaate r } @ overridepublicvoidopen (configuration parameters ) throwsexception(/我们将ValueState的TTL生命周期设置为24小时失效状态statettlconfigttlconfig=statettlconfig.new builder (org.Apache.flink.API.com mon.time.time.minutes (220 ) )自动清除. setupdatetype (statettlconfig.update type.oncreateandwrite ).setstate visibility (statetlconfig.state vibility ) valuestatedescriptorintegerdescriptor=newvaluestatedescriptorinteger (' SKU num ',Integer.class ); escriptor.enabletimetolive (TTL config ); counts=getRuntimeContext ().getstate (描述符); super.open (参数; } @ overridepublicvoidprocesselement (tuple2string,Integer value,Context ctx,CollectorTuple2String,Integer out ) throw ser }else{ //(如果存在,则为1counts.update(counts.value ) )1); }out.collect(tuple2.of(F0,counts.value ) ); }逻辑:定义继承KeyesProcessFunction的MapStateDistinctFunction类。 的处理逻辑在进程元素中,当某个数据通过时,MapState判断该数据是否已经存在,如果不存在则计数为1,如果存在则对原始计数加1。
注:此处定义了状态的有效期为24小时。 在实际生产中,大量的密钥可以膨胀状态,处理保存的密钥。 例如,使用加密方法将密钥加密为几个字节并存储。
基于hyperlog日志
百度百科:
HyperLogLog是一种估计统计算法,用于统计一个集合中不同数据的个数。 也就是说,我们说的再统计。 HyperLogLog算法是用于基数统计的算法,各Hyper
LogLog 键只需要花费 12 KB 内存,就可以计算接近 2 的 64 方个不同元素的基数。HyperLogLog 适用于大数据量的统计,因为成本相对来说是更低的,最多也就占用 12KB 内存。在不需要100%精确的业务场景喜爱,可以使用这种方法进行统计,新增依赖:
<dependency> <groupId>net.agkn</groupId> <artifactId>hll</artifactId> <version>1.6.0</version></dependency>还是以统计商品SKU业务为例:
public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> { @Override public HLL createAccumulator() { return new HLL(14, 5); } @Override public HLL add(Tuple2<String, Long> value, HLL accumulator) { //value 为访问记录 <商品sku, 用户id> accumulator.addRaw(value.f1); return accumulator; } @Override public Long getResult(HLL accumulator) { long cardinality = accumulator.cardinality(); return cardinality; } @Override public HLL merge(HLL a, HLL b) { a.union(b); return a; }}在上面的代码中,addRaw 方法用于向 HyperLogLog 中插入元素。如果插入的元素非数值型的,则需要 hash 过后才能插入。accumulator.cardinality() 方法用于计算 HyperLogLog 中元素的基数。
需要注意的是,HyperLogLog 并不是精准的去重,如果业务场景追求 100% 正确,那么一定不要使用这种方法。
基于布隆过滤器(BloomFilter)
百度百科:
BloomFilter(布隆过滤器)类似于一个 HashSet,用于快速判断某个元素是否存在于集合中,其典型的应用场景就是能够快速判断一个 key 是否存在于某容器,不存在就直接返回。需要注意的是,和 HyperLogLog 一样,布隆过滤器不能保证 100% 精确。但是它的插入和查询效率都很高。
我们可以在非精确统计的情况下使用这种方法:
public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> { private transient ValueState<BloomFilter> bloomState; private transient ValueState<Long> countState; @Override public void processElement(String value, Context ctx, Collector<Long> out) throws Exception { BloomFilter bloomFilter = bloomState.value(); Long skuCount = countState.value(); if(bloomFilter == null){ BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000); } if(skuCount == null){ skuCount = 0L; } if(!bloomFilter.mightContain(value)){ bloomFilter.put(value); skuCount = skuCount + 1; } bloomState.update(bloomFilter); countState.update(skuCount); out.collect(countState.value()); }}使用 Guava 自带的 BloomFilter,每当来一条数据时,就检查 state 中的布隆过滤器中是否存在当前的 SKU,如果没有则初始化,如果有则数量加 1。
基于BitMap
HyperLogLog 和 BloomFilter 虽然减少了存储但是丢失了精度, 这在某些业务场景下是无法被接受的。下面的这种方法不仅可以减少存储,而且还可以做到完全准确,那就是使用 BitMap。
百度百科:
在使用 BitMap 算法前,如果你需要去重的对象不是数字,那么需要先转换成数字。例如,用户可以自己创造一个映射器,将需要去重的对象和数字进行映射,最简单的办法是,可以直接使用数据库维度表中自增 ID。
新增依赖
代码:
public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> { @Override public Roaring64NavigableMap createAccumulator() { return new Roaring64NavigableMap(); } @Override public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) { accumulator.add(value); return accumulator; } @Override public Long getResult(Roaring64NavigableMap accumulator) { return accumulator.getLongCardinality(); } @Override public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) { return null; }}在上述方法中,我们使用了 Roaring64NavigableMap,其是 BitMap 的一种实现,然后我们的数据是每次被访问的 SKU,把它直接添加到 Roaring64NavigableMap 中,最后通过 accumulator.getLongCardinality() 可以直接获取结果。
基于外部数据库
假如业务场景非常复杂,并且数据量很大。为了防止无限制的状态膨胀,也不想维护庞大的 Flink 状态,可以采用外部存储的方式,比如可以选择使用 Redis 或者 HBase 存储数据,只需要设计好存储的 Key 即可。同时使用外部数据库进行存储,我们不需要关心 Flink 任务重启造成的状态丢失问题,但是有可能会出现因为重启恢复导致的数据多次发送,从而导致结果数据不准的问题。