首页 > 编程知识 正文

flink对Kafka 中的数据去重,flink处理离线数据

时间:2023-05-06 02:40:59 阅读:54448 作者:3837

声明:本系列博客为原创,最先发表在拉勾教育,其中一部分为免费阅读部分。被读者各种搬运至各大网站。所有其他的来源均为抄袭。

《2021年最新版大数据面试题全面开启更新》

重复数据消除是实际业务中经常出现的问题之一。 在大数据领域,重复数据删除有助于减少存储所需的存储容量。 此外,某些业务场景不允许重复数据,例如准确统计站点每天的用户数,或统计事实表中每天发送的快递包裹数据。 在传统的脱机计算中,可以直接在SQL中通过DISTINCT函数,或者在数据量持续增加时使用类似于MR的思想。 那么,实时计算在进行重新计数时需要增量且长期的过程,并且根据场景的不同,由于效率和精度的问题,计划也需要发生变化。

针对上述问题,Flink上常见的重复数据删除方案:

基于状态的后端是基于HyperLogLog的光晕过滤器(BloomFilter )的基于BitMap的外部数据库的基于状态的后端

Flink状态的后端类型之一是RocksDBStateBackend。 他将正在运行的状态数据保存到RocksDB数据库中。 缺省情况下,此数据库将数据保存在任务管理器运行节点的数据目录中。

RocksDB是K-V数据库,可以使用MapState来移除重量。 模拟场景,计算每个商品的SKU访问量。

总体代码如下:

publicclassmapstatedistinctfunctionextendskeyedprocessfunctionstring,Tuple2String,Integer,Tuple2String, integer { privaate r } @ overridepublicvoidopen (configuration parameters ) throwsexception(/我们将ValueState的TTL生命周期设置为24小时失效状态statettlconfigttlconfig=statettlconfig.new builder (org.Apache.flink.API.com mon.time.time.minutes (220 ) )自动清除. setupdatetype (statettlconfig.update type.oncreateandwrite ).setstate visibility (statetlconfig.state vibility ) valuestatedescriptorintegerdescriptor=newvaluestatedescriptorinteger (' SKU num ',Integer.class ); escriptor.enabletimetolive (TTL config ); counts=getRuntimeContext ().getstate (描述符); super.open (参数; } @ overridepublicvoidprocesselement (tuple2string,Integer value,Context ctx,CollectorTuple2String,Integer out ) throw ser }else{ //(如果存在,则为1counts.update(counts.value ) )1); }out.collect(tuple2.of(F0,counts.value ) ); }逻辑:定义继承KeyesProcessFunction的MapStateDistinctFunction类。 的处理逻辑在进程元素中,当某个数据通过时,MapState判断该数据是否已经存在,如果不存在则计数为1,如果存在则对原始计数加1。

注:此处定义了状态的有效期为24小时。 在实际生产中,大量的密钥可以膨胀状态,处理保存的密钥。 例如,使用加密方法将密钥加密为几个字节并存储。

基于hyperlog日志

百度百科:

HyperLogLog是一种估计统计算法,用于统计一个集合中不同数据的个数。 也就是说,我们说的再统计。 HyperLogLog算法是用于基数统计的算法,各Hyper

LogLog 键只需要花费 12 KB 内存,就可以计算接近 2 的 64 方个不同元素的基数。HyperLogLog 适用于大数据量的统计,因为成本相对来说是更低的,最多也就占用 12KB 内存。

在不需要100%精确的业务场景喜爱,可以使用这种方法进行统计,新增依赖:

<dependency> <groupId>net.agkn</groupId> <artifactId>hll</artifactId> <version>1.6.0</version></dependency>

还是以统计商品SKU业务为例:

public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> { @Override public HLL createAccumulator() { return new HLL(14, 5); } @Override public HLL add(Tuple2<String, Long> value, HLL accumulator) { //value 为访问记录 <商品sku, 用户id> accumulator.addRaw(value.f1); return accumulator; } @Override public Long getResult(HLL accumulator) { long cardinality = accumulator.cardinality(); return cardinality; } @Override public HLL merge(HLL a, HLL b) { a.union(b); return a; }}

在上面的代码中,addRaw 方法用于向 HyperLogLog 中插入元素。如果插入的元素非数值型的,则需要 hash 过后才能插入。accumulator.cardinality() 方法用于计算 HyperLogLog 中元素的基数。

需要注意的是,HyperLogLog 并不是精准的去重,如果业务场景追求 100% 正确,那么一定不要使用这种方法。

基于布隆过滤器(BloomFilter)

百度百科:

BloomFilter(布隆过滤器)类似于一个 HashSet,用于快速判断某个元素是否存在于集合中,其典型的应用场景就是能够快速判断一个 key 是否存在于某容器,不存在就直接返回。

需要注意的是,和 HyperLogLog 一样,布隆过滤器不能保证 100% 精确。但是它的插入和查询效率都很高。

我们可以在非精确统计的情况下使用这种方法:

public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> { private transient ValueState<BloomFilter> bloomState; private transient ValueState<Long> countState; @Override public void processElement(String value, Context ctx, Collector<Long> out) throws Exception { BloomFilter bloomFilter = bloomState.value(); Long skuCount = countState.value(); if(bloomFilter == null){ BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000); } if(skuCount == null){ skuCount = 0L; } if(!bloomFilter.mightContain(value)){ bloomFilter.put(value); skuCount = skuCount + 1; } bloomState.update(bloomFilter); countState.update(skuCount); out.collect(countState.value()); }}

使用 Guava 自带的 BloomFilter,每当来一条数据时,就检查 state 中的布隆过滤器中是否存在当前的 SKU,如果没有则初始化,如果有则数量加 1。

基于BitMap

HyperLogLog 和 BloomFilter 虽然减少了存储但是丢失了精度, 这在某些业务场景下是无法被接受的。下面的这种方法不仅可以减少存储,而且还可以做到完全准确,那就是使用 BitMap。
百度百科:

Bit-Map 的基本思想是用一个 bit 位来标记某个元素对应的 Value,而 Key 即是该元素。由于采用了 Bit 为单位来存储数据,因此可以大大节省存储空间。假设有这样一个需求:在 20 亿个随机整数中找出某个数 m 是否存在其中,并假设 32 位操作系统,4G 内存。在 Java 中,int 占 4 字节,1 字节 = 8 位(1 byte = 8 bit)如果每个数字用 int 存储,那就是 20 亿个 int,因而占用的空间约为 (2000000000*4/1024/1024/1024)≈7.45G如果按位存储就不一样了,20 亿个数就是 20 亿位,占用空间约为 (2000000000/8/1024/1024/1024)≈0.233G

在使用 BitMap 算法前,如果你需要去重的对象不是数字,那么需要先转换成数字。例如,用户可以自己创造一个映射器,将需要去重的对象和数字进行映射,最简单的办法是,可以直接使用数据库维度表中自增 ID。
新增依赖

<dependency> <groupId>org.roaringbitmap</groupId> <artifactId>RoaringBitmap</artifactId> <version>0.8.0</version></dependency>

代码:

public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> { @Override public Roaring64NavigableMap createAccumulator() { return new Roaring64NavigableMap(); } @Override public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) { accumulator.add(value); return accumulator; } @Override public Long getResult(Roaring64NavigableMap accumulator) { return accumulator.getLongCardinality(); } @Override public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) { return null; }}

在上述方法中,我们使用了 Roaring64NavigableMap,其是 BitMap 的一种实现,然后我们的数据是每次被访问的 SKU,把它直接添加到 Roaring64NavigableMap 中,最后通过 accumulator.getLongCardinality() 可以直接获取结果。

基于外部数据库

     假如业务场景非常复杂,并且数据量很大。为了防止无限制的状态膨胀,也不想维护庞大的 Flink 状态,可以采用外部存储的方式,比如可以选择使用 Redis 或者 HBase 存储数据,只需要设计好存储的 Key 即可。同时使用外部数据库进行存储,我们不需要关心 Flink 任务重启造成的状态丢失问题,但是有可能会出现因为重启恢复导致的数据多次发送,从而导致结果数据不准的问题。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。