首页 > 编程知识 正文

rocketmq二次开发(rocketmq详解)

时间:2023-05-06 01:54:54 阅读:95651 作者:827

本文主要研究rocketmq的产品impl

处理器impl

io /开放消息/rocket MQ/producer/producer impl.Java

publicclassproducerimplextendsabstractomsproducerimplementsproducer {

publicproducerimpl (金融价值属性) {

超级(属性);

}

@Override

公共密钥值属性(

返回属性;

}

@Override

publicsendresultsend (金融讯息) {

返回(消息,this.rocketmqproducer.getsendmsgtimeout );

}

@Override

公共密钥值属性(最终消息,最终密钥值属性)

长时间输出=属性.容器密钥(属性密钥.操作_时间输出)。

? 属性. getint (属性密钥.操作_时间输出) : this.rocketmqproducer.getsendmsgtimeout );

返回(消息,时间输出);

}

私有消息传输(金融消息传输,长时间输出)。

检查消息类型(消息;

org.Apache.rocket MQ.com mon.message.messagermqmessage=msgconvert () message );

特里

org.Apache.rocket MQ.client.producer.sendresultrmqresult=this.rocketmqproducer.send (rmqmessage,时间输出);

if (! rmqResult.getSendStatus ().equals )发送状态.发送_确定) )

log.error (字符串格式,%s ',消息);

thrownewomsruntimeexception; '-1 ',' sendmessagetorocketmqbrokerfailed.';

}

消息头().put )消息头.消息id,rmqResult.getMsgId );

returnomsutil.sendresultconvert (rmqresult;

缓存(执行) {

log.error (字符串格式,%s ',消息,e );

throwcheckproducerexception (rmqmessage.get主题(,消息头).getstring )消息头.消息id ) )

}

}

@Override

publicpromisesendresultsendasync (金融消息同步)。

returnsendasync (消息,this.rocketmqproducer.getsendmsgtimeout );

}

@Override

publicpromisesendresultsendasync (金融密钥值属性) {

长时间输出=属性.容器密钥(属性密钥.操作_时间输出)。

? 属性. getint (属性密钥.操作_时间输出) : this.rocketmqproducer.getsendmsgtimeout );

return sendAsync(message, timeout);

}

private Promise<SendResult> sendAsync(final Message message, long timeout) {

checkMessageType(message);

org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);

final Promise<SendResult> promise = new DefaultPromise<>();

try {

this.rocketmqProducer.send(rmqMessage, new SendCallback() {

@Override

public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {

message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());

promise.set(OMSUtil.sendResultConvert(rmqResult));

}

@Override

public void onException(final Throwable e) {

promise.setFailure(e);

}

}, timeout);

} catch (Exception e) {

promise.setFailure(e);

}

return promise;

}

@Override

public void sendOneway(final Message message) {

checkMessageType(message);

org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);

try {

this.rocketmqProducer.sendOneway(rmqMessage);

} catch (Exception ignore) { //Ignore the oneway exception.

}

}

@Override

public void sendOneway(final Message message, final KeyValue properties) {

sendOneway(message);

}

}

发送消息的方法主要是代理给rocketmqProducer另外调用OMSUtil.msgConvert将api的BytesMessage转换为org.apache.rocketmq.common.message.Message对于异步采用的是DefaultPromise,其callback为SendCallback

OMSUtil.msgConvert

io/openmessaging/rocketmq/utils/OMSUtil.java

public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {

org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();

rmqMessage.setBody(omsMessage.getBody());

KeyValue headers = omsMessage.headers();

KeyValue properties = omsMessage.properties();

//All destinations in RocketMQ use Topic

if (headers.containsKey(MessageHeader.TOPIC)) {

rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));

rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");

} else {

rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));

rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");

}

for (String key : properties.keySet()) {

MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));

}

//Headers has a high priority

for (String key : headers.keySet()) {

MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));

}

return rmqMessage;

}

这里主要是转换header及topic信息

SendCallback

org/apache/rocketmq/client/producer/SendCallback.java

public interface SendCallback {

void onSuccess(final SendResult sendResult);

void onException(final Throwable e);

}

对于成功,将SendResult传递过来,对于异常则传递Throwable

DefaultPromise

io/openmessaging/rocketmq/promise/DefaultPromise.java

public class DefaultPromise<V> implements Promise<V> {

private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);

private final Object lock = new Object();

private volatile FutureState state = FutureState.DOING;

private V result = null;

private long timeout;

private long createTime;

private Throwable exception = null;

private List<PromiseListener<V>> promiseListenerList;

public DefaultPromise() {

createTime = System.currentTimeMillis();

promiseListenerList = new ArrayList<>();

timeout = 5000;

}

//......

@Override

public boolean set(final V value) {

if (value == null)

return false;

this.result = value;

return done();

}

@Override

public boolean setFailure(final Throwable cause) {

if (cause == null)

return false;

this.exception = cause;

return done();

}

private boolean done() {

synchronized (lock) {

if (!isDoing()) {

return false;

}

state = FutureState.DONE;

lock.notifyAll();

}

notifyListeners();

return true;

}

private void notifyListeners() {

if (promiseListenerList != null) {

for (PromiseListener<V> listener : promiseListenerList) {

notifyListener(listener);

}

}

}

private void notifyListener(final PromiseListener<V> listener) {

try {

if (exception != null)

listener.operationFailed(this);

else

listener.operationCompleted(this);

} catch (Throwable t) {

LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);

}

}

//......

}

set或者setFailure方法都会调用done方法done方法会调用notifyListeners,回调listener的operationCompleted或者operationFailed

小结

ProducerImpl主要是为rocketmq自身的rocketmqProducer适配open-messaging的api接口异步采用自定义的SendCallback回调和DefaultPromise

doc

ProducerImpl

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。