前言:最近,由于flink版本不完整: CVE—2020—17519#Apache,官方更新了1.12个主版本,“强烈建议所有用户升级到Flink 1.12.1 然后找到了removedeprecateddatastream # split。 本文介绍的split方法被舍弃(删除),官方推荐使用Side Outputs。 但是,我还留着那个部分的代码。 一方面是flink进化的变化,另一方面是写的。
因此,flink 1.12.1目前仅支持过滤器和SideOutPut旁路分流。
Lambda公式的分析与使用
源源代码数据源中的pojo用户信息对象数据在以下博客的“模拟数据对象代码”(UserImage )中进行了说明,因此将省略对其的说明。
Flink的常用操作符和实例代码
1、过滤器是同一个数据流,遍历很多次:
public class flink _ side _ output _ filter { publicstaticvoidmain [ ] args ] throws exception { streamexecutionenvirror }数据singleoutputstreamoperatoruserimagelufei=item.filter ((filterfunctionuserimage ) value - value.getGroupId )==1) singleoutputstreamoperatoruserimageothers=item.filter (filterfunctionuserimage ) value - value.getGroupId )!=1; lufei.printToErr (; others.print (; env.execute (; )缺点)需要分成几次流程,多次遍历原始数据流,浪费集群资源,影响效率。
2、Split分流flink提供Split运算符,在Split运算符中定义OutputSelector,重写其中的select方法,标记不同类型的数据,对最后返回的SplitStream执行select方法
public class flink _ side _ output _ split { publicstaticvoidmain [ ] args ] throws exception { streamexecutionenviroor }数据splitstreamuserimagesplitstream=item.split (newoutputselectoruserimage (@ overridepubliciterablestringselect ) ) useriries if(value.getgroupid(==1) ) list.add ) ' Lufei ); }else{list.add('others ' ); }返回列表; }; ); datastream lufei=split stream.select (lufei ); datastream others=split stream.select (others ); lufei.printToErr (; others.print (; env.execute (; }
请注意,由split操作符分割的流不能进行二次分割。 如果再次调用上述分割的zeroStream和oneStream流的剥离分割,控制台将抛出以下异常:
lufei.split ((输出选择器) value-{ ArrayList string list=new ArrayList ); list.add(black_mustache ); 返回列表; () .打印机); exceptioninthread " main " Java.lang.illegalstateexception 3360 consecutivemultiplesplitsarenotsupported.splitsaredepreprecation
3、SideOutPut分流SideOutPut分流
SideOutPut是Flink框架提供的最新、最推荐的分流方法,使用SideOutPut时,必须执行以下步骤:
1、定义输出标签;
2、调用特定函数分割数据:
processfunctionkeyedprocessfunctioncoprocessfunctionkeyedcoprocessfunctionprocesswindowfunctionprocessalllwindowfunctionsideounsideoutioutioutioutioutionsioutionsionsionded
请注意,OutputTag是匿名对象。
public class flink _ side _ output _ stream { publicstaticvoidmain [ ] args } throws exception { streamexecutionenvirror }数据//分流标记outputtaguserimagelufei=newoutputtaguserimage (' lufei ) { privatestaticfinallongserialversionuid=1l; (; outputtaguserimageothers=newoutputtaguserimage { privatestaticfinallongserialversionuid=1l; (; //singleoutputstreamoperatoruserimageprocessstream=item.process (newprocessfunctionuserimage, 用户图像) { privationatage } @ overridepublicvoidprocesselement (用户图像值,ProcessFunctionUserImage,user image.com collectoruserimageout (throws exception (/todo auto-generatedmethodstubif ) value.getgroupid )==1) CTX.output } elstup }}; //输出datastreamlufeistream=processstream.getsideoutput (lufei ); datastreamothersstream=processstream.getsideoutput (others ); lufeiStream.printToErr (; othersStream.print (; env.execute (“旁路分流”); }