序
本文主要研究rocketmq的产品impl
处理器impl
io /开放消息/rocket MQ/producer/producer impl.Java
publicclassproducerimplextendsabstractomsproducerimplementsproducer {
publicproducerimpl (金融价值属性) {
超级(属性);
}
@Override
公共密钥值属性(
返回属性;
}
@Override
publicsendresultsend (金融讯息) {
返回(消息,this.rocketmqproducer.getsendmsgtimeout );
}
@Override
公共密钥值属性(最终消息,最终密钥值属性)
长时间输出=属性.容器密钥(属性密钥.操作_时间输出)。
? 属性. getint (属性密钥.操作_时间输出) : this.rocketmqproducer.getsendmsgtimeout );
返回(消息,时间输出);
}
私有消息传输(金融消息传输,长时间输出)。
检查消息类型(消息;
org.Apache.rocket MQ.com mon.message.messagermqmessage=msgconvert () message );
特里
org.Apache.rocket MQ.client.producer.sendresultrmqresult=this.rocketmqproducer.send (rmqmessage,时间输出);
if (! rmqResult.getSendStatus ().equals )发送状态.发送_确定) )
log.error (字符串格式,%s ',消息);
thrownewomsruntimeexception; '-1 ',' sendmessagetorocketmqbrokerfailed.';
}
消息头().put )消息头.消息id,rmqResult.getMsgId );
returnomsutil.sendresultconvert (rmqresult;
缓存(执行) {
log.error (字符串格式,%s ',消息,e );
throwcheckproducerexception (rmqmessage.get主题(,消息头).getstring )消息头.消息id ) )
}
}
@Override
publicpromisesendresultsendasync (金融消息同步)。
returnsendasync (消息,this.rocketmqproducer.getsendmsgtimeout );
}
@Override
publicpromisesendresultsendasync (金融密钥值属性) {
长时间输出=属性.容器密钥(属性密钥.操作_时间输出)。
? 属性. getint (属性密钥.操作_时间输出) : this.rocketmqproducer.getsendmsgtimeout );
return sendAsync(message, timeout);
}
private Promise<SendResult> sendAsync(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
final Promise<SendResult> promise = new DefaultPromise<>();
try {
this.rocketmqProducer.send(rmqMessage, new SendCallback() {
@Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult));
}
@Override
public void onException(final Throwable e) {
promise.setFailure(e);
}
}, timeout);
} catch (Exception e) {
promise.setFailure(e);
}
return promise;
}
@Override
public void sendOneway(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
try {
this.rocketmqProducer.sendOneway(rmqMessage);
} catch (Exception ignore) { //Ignore the oneway exception.
}
}
@Override
public void sendOneway(final Message message, final KeyValue properties) {
sendOneway(message);
}
}
发送消息的方法主要是代理给rocketmqProducer另外调用OMSUtil.msgConvert将api的BytesMessage转换为org.apache.rocketmq.common.message.Message对于异步采用的是DefaultPromise,其callback为SendCallbackOMSUtil.msgConvert
io/openmessaging/rocketmq/utils/OMSUtil.java
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
rmqMessage.setBody(omsMessage.getBody());
KeyValue headers = omsMessage.headers();
KeyValue properties = omsMessage.properties();
//All destinations in RocketMQ use Topic
if (headers.containsKey(MessageHeader.TOPIC)) {
rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
} else {
rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
}
for (String key : properties.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
}
//Headers has a high priority
for (String key : headers.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
}
return rmqMessage;
}
这里主要是转换header及topic信息SendCallback
org/apache/rocketmq/client/producer/SendCallback.java
public interface SendCallback {
void onSuccess(final SendResult sendResult);
void onException(final Throwable e);
}
对于成功,将SendResult传递过来,对于异常则传递ThrowableDefaultPromise
io/openmessaging/rocketmq/promise/DefaultPromise.java
public class DefaultPromise<V> implements Promise<V> {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
private final Object lock = new Object();
private volatile FutureState state = FutureState.DOING;
private V result = null;
private long timeout;
private long createTime;
private Throwable exception = null;
private List<PromiseListener<V>> promiseListenerList;
public DefaultPromise() {
createTime = System.currentTimeMillis();
promiseListenerList = new ArrayList<>();
timeout = 5000;
}
//......
@Override
public boolean set(final V value) {
if (value == null)
return false;
this.result = value;
return done();
}
@Override
public boolean setFailure(final Throwable cause) {
if (cause == null)
return false;
this.exception = cause;
return done();
}
private boolean done() {
synchronized (lock) {
if (!isDoing()) {
return false;
}
state = FutureState.DONE;
lock.notifyAll();
}
notifyListeners();
return true;
}
private void notifyListeners() {
if (promiseListenerList != null) {
for (PromiseListener<V> listener : promiseListenerList) {
notifyListener(listener);
}
}
}
private void notifyListener(final PromiseListener<V> listener) {
try {
if (exception != null)
listener.operationFailed(this);
else
listener.operationCompleted(this);
} catch (Throwable t) {
LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
}
}
//......
}
set或者setFailure方法都会调用done方法done方法会调用notifyListeners,回调listener的operationCompleted或者operationFailed小结
ProducerImpl主要是为rocketmq自身的rocketmqProducer适配open-messaging的api接口异步采用自定义的SendCallback回调和DefaultPromisedoc
ProducerImpl