背景:目前在一家电商公司,对报表的实时性要求很高。实时性要求较高的场景,比如:
1.集团各个分公司对商品配送过程中生成的各个单据的对账实时性很高。
2.采购部依赖商品的平均进价对客户进行报价,所以对商品的进价数据的实时性也有较高的要求。
之前数据量小,都是直接在后台多表join取数,随着数据量越来越大,用户查询越来越慢。为此,我们使用阿里的Flink提前进行数据预计算,然后将数据打平到一张宽表里。这样,后台查询的时候只需要查这一张宽表就可以拿到所有需要的字段了。构建宽表的好处有:
1.减少了在后台的join操作和跨列计算,用户查询响应更快 2.数据做平台迁移更容易
下面是我们宽表构建的简化流程:
1.梳理目前所有销售报表用到的表和字段
2.监听 销售单头表和 明细表(事实表)数据变化的id,发送到kafka(作用是消峰填谷)
3.Flink消费datahub的id:
1)将单头表和明细表的id分别打上不同的标识
2)根据标识区分不同类型的id,然后用这些id查数据库 取到对应的 明细id(都转换为delivery_dtl_id),去重
3)根据明细id多线程查单头表和明细表的数据。然后拿到和各维表关联的字段集合
4)采用多线程的方式,根据3)中的维表关联字段集合 分批(比如2000条) 查各个维度表的数据
ps:维度表的数据查询:先查guavaCache缓存中的数据(过期时间1分钟),如果缓存没有就查数据库(mysql),然后重新设置缓存
4)使用mapStruct将各维表的数据merge到步骤2)的数据中
5)将数据sink到ADB
基本流程就是这样。具体细节后面再慢慢完善。有做Flink实时处理的可以一起交流一下。
20211209有两个问题补充一下:
1.Flink是监听到一条数据就会执行查一次数据库,对数据库的并发压力太大,也会影响Flink的吞吐量
解决方法是:1)Flink采集id的时候开窗(例如5秒钟),根据不同类型的id分组,每组取前1000条id。实现分批次查数据2)查询数据库使用线程池
2.维表的数据可能发生变化,Flink读取的缓存中的维表数据可能是旧的数据。
解决方法:预留刷数据的topic通道。定时比较宽表字段和维表的字段发现不同,取宽表明细id往通道里刷数据即可
3.为什么取维表的数据是从mysql取,而不是ADB取?
ADB的并发度(只有30)不高