首页 > 编程知识 正文

unbounded preceding,flinksql字段数量限制

时间:2023-05-05 10:56:06 阅读:175341 作者:3046

首先,如果列举Flink SQL初学者可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸2021年的第一篇技术文章,时间紧迫,我们来谈谈这个简单的话题吧。

在数据流中执行分组查询时,如何将分组过程生成的结果(不仅仅是聚合结果)保存为中间状态? 随着key组的增加,状态也自然膨胀。 但是,这些状态数据基本上具有时效性,没有必要永久保持。 例如,如果使用Top-N语法进行隔离,则重复数据的出现通常位于特定区间内(例如1小时内或1天内),经过该时间后,不需要对应的状态。 Flink SQL提供的idle state retention time属性可确保当与状态中的key对应的数据未更新的时间达到阈值时,该状态会自动清除。 设定方法如下。

stenv.getconfig (.setidlestateretentiontime (time.hours ),time.hours ) ) 36 )注意了setidlestateretentiontime ) 看看源代码就知道了。

如何实现的idle state retention time特性在下面的层次结构中由o.a.f.table.runtime.functions.cleanupstate界面表示,代码如下所示:

publicinterfacecleanupstate { defaultvoidregisterprocessingcleanuptimer,long currentTime,luestatelongcleanuptimestate, 长当前时间长最大化时间, timerservicetimerservice (throws exception (//lastregisteredtimerlongcurcleanuptime=cleanuptime//checkifacleatime ) thatthecurrentcleanuptimerwon ' tdeletestateweneedtokeepif (curcleanuptime==nu uptime (currentimeminretentiontime ) ) weneedtoregisteranew ) later ) timerlongcleanuptime=currenttimemaxretime//registertimerandrememberclean-uptimetimerserverveservetime///deleteexpiredtimerif (curcleanuptime!=null } { timerservice.deleteprocessingtimetimer (curcleanuptime ); } clean uptimestate.update (clean uptime; }因此,与每个key对应的最近状态清理时间将单独保持在ValueState中。 满足以下两个条件之一时:

ValueState为空。 也就是说,首先显示这个key。 或者,如果当前时间和minRetentionTime超过最近的清理时间,则将当前时间和maxRetentionTime相加以注册新的Timer,并将该时间戳保存在ValueState中,从而触发下一次清理如果有过期的Timer,也将其删除。 这样,如果minRetentionTime和maxRetentionTime间隔的设定过小,则Timer和ValueState的更新频繁发生,Timer的维护成本变大,因此要设置间隔长的清理区间

CleanupState接口的继承关系如下图所示。

可以看到,有很多函数支持空闲清理,但所有基类都是keyedprocessfunctionwithcleanupstate抽象类。 源代码如下所示。

publicabstractclasskeyedprocessfunctionwithcleanupsta

te<K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> implements CleanupState { private static final long serialVersionUID = 2084560869233898457L; private final long minRetentionTime; private final long maxRetentionTime; protected final boolean stateCleaningEnabled; // holds the latest registered cleanup timer private ValueState<Long> cleanupTimeState; public KeyedProcessFunctionWithCleanupState(long minRetentionTime, long maxRetentionTime) { this.minRetentionTime = minRetentionTime; this.maxRetentionTime = maxRetentionTime; this.stateCleaningEnabled = minRetentionTime > 1; } protected void initCleanupTimeState(String stateName) { if (stateCleaningEnabled) { ValueStateDescriptor<Long> inputCntDescriptor = new ValueStateDescriptor<>(stateName, Types.LONG); cleanupTimeState = getRuntimeContext().getState(inputCntDescriptor); } } protected void registerProcessingCleanupTimer(Context ctx, long currentTime) throws Exception { if (stateCleaningEnabled) { registerProcessingCleanupTimer( cleanupTimeState, currentTime, minRetentionTime, maxRetentionTime, ctx.timerService()); } } protected boolean isProcessingTimeTimer(OnTimerContext ctx) { return ctx.timeDomain() == TimeDomain.PROCESSING_TIME; } protected void cleanupState(State... states) { for (State state : states) { state.clear(); } this.cleanupTimeState.clear(); } protected Boolean needToCleanupState(Long timestamp) throws IOException { if (stateCleaningEnabled) { Long cleanupTime = cleanupTimeState.value(); // check that the triggered timer is the last registered processing time timer. return timestamp.equals(cleanupTime); } else { return false; } }}

可以发现,空闲状态保留时间目前(1.12版本)仍然只支持processing time语义,并且minRetentionTime只有设为大于0的值才会生效。

KeyedProcessFunctionWithCleanupState只是提供了一些helper方法,具体发挥作用需要到实现类中去找。以计算Top-N的AppendOnlyTopNFunction为例,它的processElement()方法中会对到来的每个元素注册清理Timer:

@Overridepublic void processElement(RowData input, Context context, Collector<RowData> out) throws Exception { long currentTime = context.timerService().currentProcessingTime(); // register state-cleanup timer registerProcessingCleanupTimer(context, currentTime); // ......}

而一旦Timer触发,在onTimer()方法中调用基类的cleanupState()方法来实际清理:

@Overridepublic void onTimer( long timestamp, OnTimerContext ctx, Collector<RowData> out) throws Exception { if (stateCleaningEnabled) { // cleanup cache kvSortedMap.remove(keyContext.getCurrentKey()); cleanupState(dataState); }}

空闲状态保留的逻辑并不仅应用在上述Function中。在Table/SQL模块中还有一个内置的触发器StateCleaningCountTrigger,它可以对窗口中的元素进行计数,并按照计数阈值或者空闲状态保留的时间阈值来清理(即FIRE_AND_PURGE)。看官可自行参考对应的源码,不再废话了。

The End

今天号称是帝都21世纪以来最冷的一天,趁早洗洗睡吧。

民那晚安。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。