Async I/O是阿里巴巴为社区做出贡献的呼声非常高的特性,在1.2版中被引入。 主要目的是解决与外部系统交互时的网络延迟成为系统瓶颈的问题。
关于实时处理,需要使用外部存储数据进行染色时,需要慎重对待,以免与外部系统的交互延迟和对流处理的整个进度产生决定性的影响。
通过mapfunction等操作符访问外部存储器,实际上该交互过程是同步的。 例如,当请求a发送到数据库时,mapfunction将继续等待响应。 在许多情况下,此等待过程非常浪费函数的时间。
与数据库异步交互意味着一个函数实例可以同时处理许多请求并同时接收响应。 在这种情况下,通过发送其他请求或接收其他响应,可以重用并节省等待时间。 至少,等待时间会在多项请求中摊销。 这将提高许多使用情形的吞吐量。
注意:虽然也可以通过增加映射函数的并行度来提高吞吐量,但这意味着资源开销很高。 更多的MapFunction实例意味着有更多的task、线程、flink内部网络连接、到数据库的链接、高速缓存和内部状态开销。
1 .前提
要成功实现的flink异步IO功能,连接的数据库必须支持异步客户端。 幸运的是,许多常用的数据库都支持这样的客户端。
如果没有异步客户端,还可以创建多个同步客户端并将其放入线程池中,然后使用线程池执行异步功能。 当然,与异步客户端相比,这种方式效率较低。
2 .异步IO API
flink异步IO的API支持用户在data stream中使用异步请求客户端。 API自身的处理和数据流集成、消息顺序、时间、容错等。
如果目标数据库具有异步客户端,则要使用异步IO,必须执行以下三个步骤:
实施异步功能。 此函数实现请求分发的功能。
检索操作结果并将其传递给ResultFuture的callback回调。
对DataStream使用异步IO操作。
以下代码生成了基本模板:
//thisexampleimplementstheasynchronousrequestandcallbackwithfuturesthavethe
//interfaceofjava8' s futures (whichisthesameonefollowedbyflink ' s future )。
//*
* animplementationofthe ' async function ' thatsendsrequestsandsetsthecallback。
*/
classasyncdatabaserequestextendsrichasyncfunction {
/* thedatabasespecificclientthatcanissueconcurrentrequestswithcallbacks * /
rivatetransientdatabaseclientclient;
@Override
公共语音开放(configuration parameters ) throws Exception {。
client=newdatabaseclient(host,post,credentials );
}
@Override
公共void close () throws Exception { )。
client.close (;
}
@Override
publicvoidasyncinvoke (string key,final ResultFuture resultFuture ) throws Exception {
//issue the asynchronous request,receive a future for result
finalfutureresult=client.query (key;
//setthecallbacktobeexecutedoncetherequestbytheclientiscomplete
//thecallbacksimplyforwardstheresulttotheresultfuture
completablefuture.supply async (new supplier () ) )。
@Override
公共字符串获取() {
try {
return result.get (;
} catch (封装扩展) )
//Normally handled explic
itly.return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream stream = ...;
// apply the async I/O transformation
DataStream> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
重要提示:
第一次调用 ResultFuture.complete的时候 ResultFuture就会完成。所有后续的complete调用都会被忽略。
下面也有两个参数需要注意一下:
Timeout
异步IO请求被视为失败的超时时间,超过该时间异步请求就算失败。该参数主要是为了剔除死掉或者失败的请求。
Capacity
该参数定义了同时最多有多少个异步请求在处理。即使异步IO的方式会导致更高的吞吐量,但是对于实时应用来说该操作也是一个瓶颈。限制并发请求数,算子不会积压过多的未处理请求,但是一旦超过容量的显示会触发背压。
3. 超时处理
当一个异步IO请求多次超时,默认情况下会抛出一个异常,然后重启job。如果想处理超时,可以覆盖AsyncFunction#timeout方法。
4. 结果的顺序
AsyncFunction发起的并发请求完成的顺序是不可预期的。为了控制结果发送的顺序,flink提供了两种模式:
1). Unordered
结果记录在异步请求结束后立刻发送。流中的数据在经过该异步IO操作后顺序就和以前不一样了。当使用处理时间作为基础时间特性的时候,该方式具有极低的延迟和极低的负载。调用方式
AsyncDataStream.unorderedWait(...)
(处理时间无序图)
2). Ordered
该种方式流的顺序会被保留。结果记录发送的顺序和异步请求被触发的顺序一样,该顺序就是愿意流中事件的顺序。为了实现该目标,操作算子会在该结果记录之前的记录为发送之前缓存该记录。这往往会引入额外的延迟和一些Checkpoint负载,因为相比于无序模式结果记录会保存在Checkpoint状态内部较长的时间。调用方式
AsyncDataStream.orderedWait(...)
5. 事件时间
当使用事件时间的时候,异步IO操作也会正确的处理watermark机制。这就意味着两种order模式的具体操作如下:
1). Unordered
watermark不会超过记录,反之亦然,意味着watermark建立了一个order边界。记录仅会在两个watermark之间无序发射。watermark之后的记录仅会在watermark发送之后发送。watermark也仅会在该watermark之前的所有记录发射完成之后发送。
这就意味着在存在watermark的情况下,无序模式引入了一些与有序模式相同的延迟和管理开销。开销的大小取决于watermark的频率。
(事件时间无序图)
2). Ordered
watermark的顺序就如记录的顺序一样被保存。与处理时间相比,开销没有显著变化。
请记住,注入时间 Ingestion Time是基于源处理时间自动生成的watermark事件时间的特殊情况。
6. 容错担保
异步IO操作提供了仅一次处理的容错担保。它会将在传出的异步IO请求保存于Checkpoint,然后故障恢复的时候从Checkpoint中恢复这些请求。
详细的案例公众号回复 异步 即可得到详细的案例。