回复回调
可以更好地了解a用途的一个方法
基于React流的方法是简化非阻塞IO调用的方法。
本文简要介绍了与同步远程调用相关的代码,并介绍了如何在无阻塞IO中进行分层。 虽然使用资源(特别是线程)非常有效,但我们将了解回调地狱和基于React流的方法如何简化编程模型。
为了创建目标服务客户呼叫,表示城市详细信息的目标服务有两个端点。 如果使用uri类型的/cityIDs调用,将返回城市id列表。 结果的例子如下。
[ 1,2,3,4,5,6,7 ]端点返回特定城市ID的城市详细信息。 例如,如果使用id1“/cities/1”调用:
{ 'country' : 'USA ',' ID' : 1,' name' : 'Portland ',' pop' : 1600000 }客户负责获取城市标识列表,并按城市标识获取城市详细信息
同步通话我正在使用Spring Framework的RestTemplate进行远程呼叫。 Kotlin函数获取的城市ID列表如下所示。
private fun getcityids (3360 liststring { valcityidsentity : responseentityliststring=rest template.exchange (3358 localhocalhonge ET,null,object : parameterizedtypereferenceliststring ({ } ) return cityIdsEntity.body! 呃! }获取城市详细信息:
privatefungetcityforid (id : string ) : city (returnresttemplate.getforobject ) ) http://localhost 3360 $ localserververpococe }指定这两个功能后,可以很容易地组合起来返回城市列表。
valcityids 3360 liststring=getcityids (val cities 3360 list city=cityids.stream ).mapcity ) cityid-getcityforid cityids ollectors.tolist ) ) cities.foreach (city-logger.info ) city.tostring ) }此代码非常容易理解,但涉及八个阻塞呼叫- -
1.7获取城市ID列表,获取各城市详细信息2.7获取城市各城市详细信息
这些调用分别位于不同的线程上。
当无阻塞IO和回调组合在一起时,将使用一个名为async http客户端的库进行无阻塞IO调用。
进行远程调用时,AyncHttpClient返回ListenableFuture类型。
valresponselistenablefuture : listenablefutureresponse=async http client.prepare get (' http://localhost 3360 $ local server pre
responselistenablefuture.addlistener (runnable { val response : response=responselistenablefuture.get () ) ) )。 val response body : string=response.responsebodyvalcityids 3360 list long=object mapper.readvaluelistlong ()
nce<List<Long>>() {}) .... }给定城市ID列表,我想获取城市的详细信息,因此从响应中,我需要进行更多的远程调用,并为每个调用附加一个回调,以沿以下方向获取城市的详细信息:
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cityids" ) .execute() responseListenableFuture.addListener(Runnable { val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) cityIds.stream().map { cityId -> val cityListenableFuture = asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cities/$cityId" ) .execute() cityListenableFuture.addListener(Runnable { val cityDescResp = cityListenableFuture.get() val cityDesc = cityDescResp.responseBody val city = objectMapper.readValue(cityDesc, City:: class .java) LOGGER.info( "Got city: $city" ) }, executor) }.collect(Collectors.toList()) }, executor)这是一段粗糙的代码,在一个回调中有一组回调,很难对此进行推理和理解,因此被称为回调地狱。
在Java CompletableFuture中使用非阻塞IO通过返回Java的CompletableFuture作为返回类型而不是ListenableFuture,可以对代码进行一些改进。 CompletableFuture提供允许修改返回的返回类型的运算符。
例如,考虑使用函数获取城市ID列表:
private fun getCityIds(): CompletableFuture<List<Long>> { return asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cityids" ) .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {}) l } }在这里,我使用“ thenApply”运算符将“ CompletableFuture <Response>”转换为“ CompletableFuture <List <Long >>”
并类似地获得城市的详细信息:
private fun getCityDetail(cityId: Long): CompletableFuture<City> { return asyncHttpClient.prepareGet( " http://localhost: $localServerPort/cities/$cityId" ) .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody LOGGER.info( "Got {}" , s) val city = objectMapper.readValue(s, City:: class .java) city } }这是基于回调方法的改进,但是,CompletableFuture缺少足够的运算符,例如,在这种特定情况下,需要将所有城市详细信息放在一起:
val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds() val citiesCompletableFuture: CompletableFuture<List<City>> = cityIdsFuture .thenCompose { l -> val citiesCompletable: List<CompletableFuture<City>> = l.stream() .map { cityId -> getCityDetail(cityId) }.collect(toList()) val citiesCompletableFutureOfList: CompletableFuture<List<City>> = CompletableFuture.allOf(*citiesCompletable.toTypedArray()) .thenApply { _: Void? -> citiesCompletable .stream() .map { it.join() } .collect(toList()) } citiesCompletableFutureOfList }我使用了一个名为CompletableFuture.allOf的运算符,该运算符返回“ Void”类型,并且必须强制返回所需的“” CompletableFuture <List <City >>类型。
使用Project ReactorProject Reactor是Reactive Streams规范的实现。 它有两种特殊类型,可返回0/1项目流和0 / n项目流–前者是Mono,后者是Flux。
Project Reactor提供了一组非常丰富的运算符,这些运算符允许以多种方式转换数据流。 首先考虑该函数以返回城市ID列表:
private fun getCityIds(): Flux<Long> { return webClient.get() .uri( "/cityids" ) .exchange() .flatMapMany { response -> LOGGER.info( "Received cities.." ) response.bodyToFlux<Long>() } }我正在使用Spring出色的WebClient库进行远程调用,并获得ProjectReact器“ Mono <ClientResponse>”类型的响应,可以使用“ flatMapMany”运算符将其修改为“ Flux <Long>”类型。
在给定城市ID的情况下,按照相同的步骤获取城市的详细信息:
private fun getCityDetail(cityId: Long?): Mono<City> { return webClient.get() .uri( "/cities/{id}" , cityId!!) .exchange() .flatMap { response -> val city: Mono<City> = response.bodyToMono() LOGGER.info( "Received city.." ) city } }在这里,使用“ flatMap”运算符将项目React堆“ Mono <ClientResponse>”类型转换为“ Mono <City>”类型。
以及从中获取城市ID和城市的代码:
val cityIdsFlux: Flux<Long> = getCityIds() val citiesFlux: Flux<City> = cityIdsFlux .flatMap { this .getCityDetail(it) } return citiesFlux这非常具有表现力-对比了基于回调的方法的混乱和基于响应流的方法的简单性。
结论在我看来,这是使用基于响应流的方法的最大原因之一,尤其是在涉及跨越异步边界的场景(例如在这种情况下进行远程调用)的情况下,尤其是Project Reactor。 它清除了各种回调和回调地狱,并提供了使用一组丰富的运算符修改/转换类型的自然方法。
我在这里使用的所有示例的工作版本的存储库位于https://github.com/bijukunjummen/reactive-cities-demo/tree/master/src/test/kotlin/samples/geo/kotlin
翻译自: https://www.javacodegeeks.com/2019/06/callback-hell-reactive-patterns.html
react回调