首页 > 编程知识 正文

flink广播变量动态更新,flink 统计一天数据

时间:2023-05-04 21:11:02 阅读:138615 作者:1315

watermark在Flink上介绍了。 watermark是Apache Flink为了处理事件时间窗口计算而提出的机制,本质上是时间戳。 用于处理实时数据中的顺序扰动,通常是结合水位线和窗口实现的。

从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,数据的顺序受到网络延迟、背压等诸多因素的影响。 进行加窗处理时,不能无限期等待延迟数据的到来。 到达特定watermark时,可以触发窗口计算,认为watermark之前的所有数据都已到达。 这个结构是水印。

上图:

w ) 11 ) :表示到11为止的数据已经到达,到11为止的数据已经可以计算了。

w(20 ) :表示到了20个数据,可以计算到20个数据。

watermark的利用生成定时watermark可以在收到数据源的数据后立即生成watermark。 也可以在数据源之后使用地图或过滤器进行操作,然后生成水印。

水位线生产的最佳位置是在尽可能靠近数据源的地方是因为在水位线生成时进行了关于要素顺序的相对时间戳的假设。 由于数据源读取过程是并行的,因此导致Flink数据流分区之间重新分布的所有操作(如并行度更改和keyby )都会打乱元素时间戳的顺序。 但是,如果是初始化的filter、map等不引起要素再分发的操作,可以考虑在生成水位线之前使用。

watermark计算watermark进入flink窗口的最大事件时间(maxEventTime ) -指定的延迟时间(t ) ) ) ) ) ) ) ) ) ) )的延迟)

生成方式之一: With Periodic Watermarks这将周期性地触发Waterrmark的生成和发送。

周期性分配水位线是程序中常用的指示系统按一定时间间隔发出的水位线。

如果将时间设置为事件时间,则该时间间隔默认为200ms,但如果需要调整,可以自行设置。

任务时间类型和

valenv=streamexecutionenvironment.getexecutionenvironment//设定时间事件时间env.setstreamtimecharacteristic (timecharacteristic ) 设置自动周期性生成水印,默认值为200毫秒env.getconfig.setautowatermarkinterval ) 1000 )设置水位线水印的值

//从本地socket端口检索数据val datastream=env.socket text stream (' 127.0.0.1 ), 将10010数据转换为tuple2格式valt upstream=datastream.map (line={ valarr=line.split (' ) arr )0),并将其转换为ARR(1) 设置水位线valwaterdatastream=t upstream.assigntimestampsandwatermarks (//)的最小延迟时间watermark strategy.forboundedoutoforderness () 时间戳. withtimestampassigner (newserializabletimestampassigner [ tuple2[ string,long ()//当前最大值varcurrentmaxnum=0lllettimampassigner overridedefextracttimestamp (t : tuple2[ string,Long],recordTimesstamp: Long ) 3360 long={ valetime=t._ 2c u 2c val watermark=current maxnum-2000; println (数据:(t.tostring )、SDF.format ) etime )、当前watermark : (SDF.format ) Waterma

rk)) eTime } }) ) //对数据进行计算和输出 waterDataStream.keyBy(_._1).timeWindow(Time.seconds(3)).reduce((e1, e2)=>{ (e1._1,e1._2+e2._2) }).print()

输入和输出:

--------------------输入s3 1639100010955s2 1639100009955s1 1639100008955s0 1639100007955s4 1639100011955s5 1639100012955s6 1639100013955s7 1639100016955 --------------------输出 数据:(s3,1639100010955) ,2021-12-10 09:33:30 , 当前watermark: 2021-12-10 09:33:28数据:(s2,1639100009955) ,2021-12-10 09:33:29 , 当前watermark: 2021-12-10 09:33:28数据:(s1,1639100008955) ,2021-12-10 09:33:28 , 当前watermark: 2021-12-10 09:33:28数据:(s0,1639100007955) ,2021-12-10 09:33:27 , 当前watermark: 2021-12-10 09:33:28数据:(s4,1639100011955) ,2021-12-10 09:33:31 , 当前watermark: 2021-12-10 09:33:29数据:(s5,1639100012955) ,2021-12-10 09:33:32 , 当前watermark: 2021-12-10 09:33:30(s2,1639100009955)(s0,1639100007955)(s1,1639100008955)数据:(s6,1639100013955) ,2021-12-10 09:33:33 , 当前watermark: 2021-12-10 09:33:31数据:(s7,1639100016955) ,2021-12-10 09:33:36 , 当前watermark: 2021-12-10 09:33:34(s3,1639100010955)(s5,1639100012955)(s4,1639100011955)

说明:

在使用timeWindow的时候,会根据设置的窗口大小 3,将一分钟内的窗口划分为:
0-2,3-5,6-8,9-11,12-14,15-17,18-20,21-23,24-26,27-29,30-32,33-35…watermark的值是当前输入数据中最大时间戳-去乱序时间。 在watermark前的数据才会被认定是正常的,可供window进行计算的数据。上面程序中输入s3-s4时,watermark为的秒数是28和29,是在 timewindow划分的时间窗口 27-29 中,所以没有触发计算。直到输入s5,此时watermark秒数是30,在另一个窗口 30-32的窗口中,才会触发 27-29窗口的计算,所以才输出 s2,s0,s1的值。同理到s7的时候,又是另一个窗口33-35,所以触发上一个窗口的计算。 第二种: With Punctuated Watermarks

定点水位线(标记水位线)不是太常用,主要为输入流中包含一些用于指示系统进度的特殊元组和标记,方便根据输入元素生成水位线的场景使用的。
由于数据流中每一个递增的EventTime都会产生一个Watermark。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> { @Override public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { if (event.hasWatermarkMarker()) { output.emitWatermark(new Watermark(event.getWatermarkTimestamp())); } } @Override public void onPeriodicEmit(WatermarkOutput output) { // onEvent 中已经实现 }} 延迟数据的处理方式

针对延迟太久的数据有3中处理方案:

丢弃(默认)allowedLateness: 指定允许数据延迟的时间sideOutputLateData: 收集迟到的数据

对于迟到太久的数据默认是丢弃的。 不会触发window。因为输入的数据所在的窗口已经执行过了。Flink对这些迟到数据执行的方案就是丢弃。

如果迟到不久,输入的数据所在的窗口还未执行,是不会丢弃的。 这个要看窗口大小最大允许的数据乱序时间

附上 Flink官方文档地址:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/event-time/generating_watermarks/

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。