首页 > 编程知识 正文

双向数据流(基于apache flink的流处理 pdf)

时间:2023-05-04 01:49:44 阅读:66995 作者:218

前言:最近,由于flink版本不完整: CVE—2020—17519#Apache,官方更新了1.12个主版本,“强烈建议所有用户升级到Flink 1.12.1 然后找到了removedeprecateddatastream # split。 本文介绍的split方法被舍弃(删除),官方推荐使用Side Outputs。 但是,我还留着那个部分的代码。 一方面是flink进化的变化,另一方面是写的。

因此,flink 1.12.1目前仅支持过滤器和SideOutPut旁路分流。

Lambda公式的分析与使用

源源代码数据源中的pojo用户信息对象数据在以下博客的“模拟数据对象代码”(UserImage )中进行了说明,因此将省略对其的说明。

Flink的常用操作符和实例代码

1、过滤器是同一个数据流,遍历很多次:

public class flink _ side _ output _ filter { publicstaticvoidmain [ ] args ] throws exception { streamexecutionenvirror }数据singleoutputstreamoperatoruserimagelufei=item.filter ((filterfunctionuserimage ) value - value.getGroupId )==1) singleoutputstreamoperatoruserimageothers=item.filter (filterfunctionuserimage ) value - value.getGroupId )!=1; lufei.printToErr (; others.print (; env.execute (; )缺点)需要分成几次流程,多次遍历原始数据流,浪费集群资源,影响效率。

2、Split分流flink提供Split运算符,在Split运算符中定义OutputSelector,重写其中的select方法,标记不同类型的数据,对最后返回的SplitStream执行select方法

public class flink _ side _ output _ split { publicstaticvoidmain [ ] args ] throws exception { streamexecutionenviroor }数据splitstreamuserimagesplitstream=item.split (newoutputselectoruserimage (@ overridepubliciterablestringselect ) ) useriries if(value.getgroupid(==1) ) list.add ) ' Lufei ); }else{list.add('others ' ); }返回列表; }; ); datastream lufei=split stream.select (lufei ); datastream others=split stream.select (others ); lufei.printToErr (; others.print (; env.execute (; }

请注意,由split操作符分割的流不能进行二次分割。 如果再次调用上述分割的zeroStream和oneStream流的剥离分割,控制台将抛出以下异常:

lufei.split ((输出选择器) value-{ ArrayList string list=new ArrayList ); list.add(black_mustache ); 返回列表; () .打印机); exceptioninthread " main " Java.lang.illegalstateexception 3360 consecutivemultiplesplitsarenotsupported.splitsaredepreprecation

3、SideOutPut分流SideOutPut分流

SideOutPut是Flink框架提供的最新、最推荐的分流方法,使用SideOutPut时,必须执行以下步骤:

1、定义输出标签;

2、调用特定函数分割数据:

processfunctionkeyedprocessfunctioncoprocessfunctionkeyedcoprocessfunctionprocesswindowfunctionprocessalllwindowfunctionsideounsideoutioutioutioutioutionsioutionsionsionded

请注意,OutputTag是匿名对象。

public class flink _ side _ output _ stream { publicstaticvoidmain [ ] args } throws exception { streamexecutionenvirror }数据//分流标记outputtaguserimagelufei=newoutputtaguserimage (' lufei ) { privatestaticfinallongserialversionuid=1l; (; outputtaguserimageothers=newoutputtaguserimage { privatestaticfinallongserialversionuid=1l; (; //singleoutputstreamoperatoruserimageprocessstream=item.process (newprocessfunctionuserimage, 用户图像) { privationatage } @ overridepublicvoidprocesselement (用户图像值,ProcessFunctionUserImage,user image.com collectoruserimageout (throws exception (/todo auto-generatedmethodstubif ) value.getgroupid )==1) CTX.output } elstup }}; //输出datastreamlufeistream=processstream.getsideoutput (lufei ); datastreamothersstream=processstream.getsideoutput (others ); lufeiStream.printToErr (; othersStream.print (; env.execute (“旁路分流”); }

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。