首页 > 编程知识 正文

响应式编程 优点,spring响应式编程

时间:2023-05-05 09:44:47 阅读:175517 作者:1501

第九章利用RxJava进行响应编程版权声明:本文由博主自行翻译,转载请注明出处。 3359 blog.csdn.net/eline space/article/details/80675371

相应的代码位于此指南仓库的step-8目录中

目前,我们使用基于回调的API探索了Vert.x技术堆栈的一些部分。 仅仅是起作用,这个编程模型对开发人员来说在很多语言中都很熟悉。 不过,如果dddts组合了几个事件源或处理复杂的数据流,可能会有点麻烦。

这就是RxJava闪耀的地方,Vert.x无缝集成。

9.1启用rx Java API除了基于回调的API外,Vert.x模块还提供了一组“rx ified”API。 要启用它,必须首先将vertx-rx-java模块添加到Maven POM文件中。

ependencygroupidio.vertx/groupidartifactidvertx-rx-Java/artifact id/dependency verticle从IO.vertx.rxJava .继续上一个类扩展了后者,公开了类型io.vertx.rxjava.core.Vertx的属性。

io.vertx.rxjava.core.Vertx定义了其他rxsomething(…)方法,这些方法等效于基于回调的对等方。

让我们看看主版本。 更好地理解在实践中是如何发挥作用的吧。

singlestringdbverticledeployment=vertx.rxdeployverticle (' io.vertx.guides.wiki.database.wikidatabaseverticle ' ) rxDeploy方法没有使用硬件

9.2要完成按顺序部署verticle的MainVerticle重构,必须确保已触发部署操作并按顺序进行。

dverticledeployment.flat map (id-(singlestringhttpverticledeployment=vertx.rxdeployverticle (io.vertx.guides retid ) ) }.subscribe (id-start future.com plete ),startFuture:fail ); flatMap方法将此函数应用于dbVerticleDeployment的结果。 在此,安排http服务器版本的部署。

操作在订阅时开始。 根据结果是成功还是失败,MainVerticle将调用startFuture的complete或fail方法。

如果您按顺序阅读第9.3节“rxi fying”中的HttpServerVerticle指南并按照中的说明编辑代码,则HttpServerVerticle类仍使用基于回调的API。 使用RxJava API执行异步操作之前,必须重建HttpServerVerticle,例如并发

9.3.1部署vert.x类的rx Java io.vertx.rx Java.core.abstract verticle版本; import io.vertx.rx Java.core.http.http server; import io.vertx.rx Java.ext.auth.auth provider; import io.vertx.rx Java.ext.auth.user; import io.vertx.rx Java.ext.auth.jwt.jwt auth; import io.vertx.rx Java.ext.auth.Shiro.Shiro auth; import io.vertx.rx Java.ext.web.router; import io.vertx.rx Java.ext.web.routing context; import io.vertx.rx Java.ext.web.client.web client; import io.vertx.rx Java.ext.web.client.httpresponse; import io.vertx.rx Java.ext.web.codec.body codec; import io.vertx.rx Java.ext.web.handler.*; import io.vertx.rx Java.ext.web.sstore.localsessionstore; import io.vertx.rxjava.ext.w

eb.templ.FreeMarkerTemplateEngine; import org.slf4j.Logger;import org.slf4j.LoggerFactory;import rx.Observable;import rx.Single;

① 我们的backupHandler()方法依旧使用HttpResponse类,因此它必须被导入。事实证明,Vert.x提供的RxJava版本的HttpResponse可以作为这种情况下的替代。在本指南仓库step-8目录中的“Rxified”代码没有导入这个类,因为响应类型是由lambda表达式推断的。

9.3.2 在一个“Rxified” vertx实例上使用委派

dddts有一个io.vertx.rxjava.core.Vertx对象,并希望对io.vertx.core.Vertx实例进行方法调用时,可以调用getDelegate()方法。Verticle的start()方法需要调整,当创建一个WikiDatabaseService时:

@Overridepublic void start(Future<Void> startFuture) throws Exception { String wikiDbQueue = config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue"); dbService = io.vertx.guides.wiki.database.WikiDatabaseService.createProxy(vertx.getDelegate(), wikiDbQueue); 9.4 并发执行授权查询

在前面的例子中,我们看到了如何使用RxJava和Rxified Vert.x API按顺序执行异步操作。但是有时候这种保证(译者注:指按顺序执行异步操作)是不需要的,或许你只是出于性能原因需要它们简单的并发运行。

HttpServerVerticle的JWT令牌生成过程是这种情况的一个好例子。为了创建一个令牌,我们需要所有的授权查询结果完成,但是查询是相互独立的:

auth.rxAuthenticate(creds).flatMap(user -> { Single<Boolean> create = user.rxIsAuthorised("create"); ① Single<Boolean> delete = user.rxIsAuthorised("delete"); Single<Boolean> update = user.rxIsAuthorised("update"); return Single.zip(create, delete, update, (canCreate, canDelete, canUpdate) -> { ② return jwtAuth.generateToken( new JsonObject() .put("username", context.request().getHeader("login")) .put("canCreate", canCreate) .put("canDelete", canDelete) .put("canUpdate", canUpdate), new JWTOptions() .setSubject("Wiki API") .setIssuer("Vert.x")); });}).subscribe(token -> { context.response().putHeader("Content-Type", "text/plain").end(token); }, t -> context.fail(401));

① 创建了三个Single对象,表示不同的授权查询。

② 当三个操作成功完成时,执行zip操作的回调方法,使用前面三个操作的结果。

9.5 使用数据库链接

为了从池中获取一个数据库链接,所有你需要做的就是调用JDBCClient上的rxGetConnection方法:

Single<SQLConnection> connection = dbClient.rxGetConnection();

这个方法返回了一个Single,你可以轻易使用flatMap变换来执行SQL查询:

Single<ResultSet> resultSet = dbClient.rxQueryWithParams( sqlQueries.get(SqlQuery.GET_PAGE_BY_ID), new JsonArray().add(id));

但是,如果SQLConnection引用不再可达,我们怎么释放该链接?一个简单而且方便的方法是当Single取消订阅时执行close:

private Single<SQLConnection> getConnection() { return dbClient.rxGetConnection().flatMap(conn -> { Single<SQLConnection> connectionSingle = Single.just(conn); ① return connectionSingle.doOnUnsubscribe(conn::close); ② });}

① 在获取链接之后,我们将其封装为一个Single。

② Single修改为当取消订阅时,调用close。

现在我们可以在数据库Verticle中任何需要执行SQL查询的时候使用getConnection。

9.6 消除回调和RxJava之间的差距

有时,你可能必须混合RxJava代码和基于回调的API。例如,服务代理接口只能定义为回调的方式,但是它的实现使用了Vert.x Rxified API。

这种情况下,io.vertx.rx.java.RxHelper类可以适配Handler

@Overridepublic WikiDatabaseService fetchAllPagesData(Handler<AsyncResult<List<JsonObject>>> resultHandler) { ① dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES_DATA)) .map(ResultSet::getRows) .subscribe(RxHelper.toSubscriber(resultHandler)); ② return this;}

① fetchAllPagesData是一个异步服务代理操作,其定义使用了Handler

9.7 数据流

RxJava不仅是合并不同事件来源的伟大工具,它对于数据流也非常有帮助。不像Vert.x future或者JDK future,Observable发出一个事件流,而不仅是一个单独的事件,并且它拥有一个广泛的数据操作运算集。

我们可以使用这些操作中一些来重构数据库Verticle中的fetchAllPages方法:

public WikiDatabaseService fetchAllPages(Handler<AsyncResult<JsonArray>> resultHandler) { dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES)) .flatMapObservable(res -> { ① List<JsonArray> results = res.getResults(); return Observable.from(results); ② }) .map(json->json.getString(0)) ③ .sorted() ④ .collect(JsonArray::new, JsonArray::add) ⑤ .subscribe(RxHelper.toSubscriber(resultHandler)); return this; }

① 通过flatMapObservable,我们可以使用Single发出的条目创建一个Observable。

② from将数据库results迭代转换成一个Observable,该Observable发出数据库行条目。

③ 由于我们只需要页面名称,我们可以map每个JsonObject行到首列。

④ 客户端希望数据按照字母表顺序sorted。

⑤ 事件总线服务应答包含在一个单独的JsonArray中。collect方法通过JsonArray::new创建一个新的对象,然后当条目发出时通过JsonArray::add方法添加它们。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。