通常,在业务开发过程中,当遇到业务解耦、流量削峰或异步通信等不同场景时,往往需要使用消息中间件来实现这一目标。然后,让我们初步了解一下每个中间件的一些核心功能的支持情况。本文比较了以下三个产品(正在陆续补充和更新)。
三种产品的比较
今天,我们将重点介绍RocketMQ和QMQ产品对事物消息的支持以及背后的实现原则。
事务消息在RocketMQ中的定义如下:它意味着应用本地事务和发送消息操作可以定义为全局事务,或者成功,或者同时失败。RocketMQ的事务消息提供了类似X/Open XA的分布式事务功能,通过事务消息可以实现分布式事务的最终一致性。
然而,QMQ通过一个具体的业务案例来描述场景中的事务消息:如果消息要在事务等场景中使用,一致性是一个非常关键的要求。也就是说,不发送业务运营成功消息,或者发送消息但业务不成功。比如支付服务用消息通知票务服务,那么支付就不能成功,但是消息不发出,就会引起用户的投诉;但是,不可能有不成功的付款,但消息最终还是发出了,这会导致公司的损失。综上所述,消息传递和业务都需要交易保证。
两种产品的具体实施机制:
(1)火箭MQ
在4.3.0版本中,支持分布式事务消息。这里,RocketMQ采用了2PC的思想提交事务消息,同时加入了补偿逻辑来处理两级超时或失败的消息,如下图所示。
两阶段提交
上图主要分为两个过程:正常交易消息的发送提交和交易消息的补偿过程。
答:正常交易消息的发送和提交流程
1.发消息(半消息),为什么是半消息?其实就是消费者还不能消费,拿不到的信息;
2.服务器通过ack机制响应消息写入结果;
3.根据发送结果执行本地事务(消息发送失败,本地逻辑不执行,半消息对业务不可见);
4.根据本地事务状态,执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见);
b:补偿过程(用于解决消息提交或回滚超时或失败)
5.对于没有Commit/Rollback的事务消息(处于挂起状态的消息),从服务器发起“回查”,一般情况下,无论本地事务是由于自身原因还是网络原因再次提交还是回滚,生产者都不会向Broker发送消息;
6.6。生产者接收回检消息,并检查回检消息对应的本地事务的状态;
7.根据本地事务状态再次提交或回滚。
(2)质量管理体系
在所有的业务数据库应用和创建过程中,QMQ默认初始化一个Message DB,它对业务是透明的,业务不知道这个库的存在。然后在生产者客户端发送消息,将业务操作放在同一个数据库事务中,通过数据库的事务控制实现事务消息机制。官网给出的案例如下:
在支付场景中,支付成功后,需要插入支付流程,发送支付完成消息通知其他系统。那么在这里插入支付管道和发送消息需要一致,任何不成功的步骤最终都会导致问题。伪代码如下:
@事务性
公共无效工资(订单){ 0
pay transaction t=buildpay transaction(订单);
paydao . append(t);
producer . send message(Build message(t));
}
具体解释如下:
@事务性
公共无效工资(订单){ 0
pay transaction t=buildpay transaction(订单);
paydao . append(t);
//producer . send message(build message(t));
最终消息=build Message(t);
messageDao.insert(消息);
//提交交易后执行。
triggereaftertransactioncommit(()-{ 0
messageClient.send(消息);
messageDao.delete(消息);
});
}
>实际上在producer.sendMessage执行的时候,消息并没有通过网络发送出去,而仅仅是往业务DB同一个实例上的消息库插入了一条记录,然后注册事务的回调,在这个事务真正提交后消息才从网络发送出去,这个时候如果发送到server成功的话消息会被立即删除掉。而如果消息发送失败则消息就留在消息库里,这个时候我们会有一个补偿任务会将这些消息从消息库里捞出然后重新发送,直到发送成功。整个流程就如下图所示:
实际上在producer.sendMessage执行的时候,消息并没有通过网络发送出去,而仅仅是往业务DB同一个实例上的消息库插入了一条记录,然后注册事务的回调,在这个事务真正提交后消息才从网络发送出去,这个时候如果发送到server成功的话消息会被立即删除掉。而如果消息发送失败则消息就留在消息库里,这个时候我们会有一个补偿任务会将这些消息从消息库里捞出然后重新发送,直到发送成功。整个流程就如下图所示:
1.begin tx 开启本地事务;
2.do work 执行业务操作;
3.insert message 向同实例消息库插入消息;
4.end tx 事务提交;
5.send message 网络向server发送消息;
6.reponse server回应消息;
7.delete message 如果server回复成功则删除消息;
8.scan messages 补偿任务扫描未发送消息;
9.send message 补偿任务补偿消息;
10.delete messages 补偿任务删除补偿成功的消息;
补充:上图中的task,可以认为是在MQ后台管理端的一个Schedule,因为任何一个需要发布/订阅消息的业务端,都需要在MQ管理后台进行登记注册,因此通过这个来获取业务库的连接,从而达到扫描失败消息、删除成功消息的目的。
总结:仔细分析下,两款产品对事务消息的支持,有异曲同工之妙。