首页 > 编程知识 正文

mq实时发送接收消息,mq监听端口

时间:2023-05-05 18:36:46 阅读:232890 作者:465

package com.xxx.system.listener;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;import com.XXX.plugins.msgqueue.hikvision.cms.api.MQHandler;import com.XXX.system.service.MsgService;public class Eventbus implements ServletContextListener { public static Map<String, Object> PLOT = new HashMap<String, Object>(); @Override public void contextDestroyed(ServletContextEvent arg0) { // TODO Auto-generated method stub } @Override public void contextInitialized(ServletContextEvent arg0) { // TODO Auto-generated method stub try { MsgService msgService = new MsgService(); Map<String, Object> oo = new HashMap<String, Object>(); Map<String, Object> map = msgService.getPlotStatus(null, null, 0, 0, null); // 查询所有车位信息 if (map != null && map.get("data") != null) { Map<String, Object> data = (Map<String, Object>) map.get("data"); if (data != null && data.get("list") != null) { List<Map<String, Object>> list = (List<Map<String, Object>>) data.get("list"); for (Map<String, Object> m : list) { PLOT.put((String) m.get("plotUuid"), m.get("plotNo")); } } } // String opuseruuid = msgService.getUserUuid(); // // Map<String,Object> paramMap = new HashMap<String,Object>(); // paramMap.put("appkey", BaseParamIn.appkey); // paramMap.put("time", System.currentTimeMillis()); // paramMap.put("opUserUuid", opuseruuid); // paramMap.put("eventTypes","524548,524547"); // String result = // HttpClientUtil.doPost(InterfaceConfiguration.SUBSCRIBEEVENTSURI, // paramMap); // JSONObject o = JSON.parseObject(result); String broker_url = "failover:(tcp://192.168.10.2:61618)?timeout=2000"; String target = "openapi.pms.topic"; MQHandler.getMQ(broker_url, target); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }} package com.xxx.plugins.msgqueue.hikvision.cms.api;import javax.jms.BytesMessage;import javax.jms.Connection;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.log4j.Logger;import com.google.protobuf.ByteString;import com.XXX.plugins.msgqueue.hikvision.cms.api.eps.beds.EventDis;import com.XXX.plugins.msgqueue.hikvision.cms.api.eps.beds.PmsEvent;import com.XXX.system.listener.Eventbus;import com.XXX.system.web.MsgController;import com.XXX.utils.Utils;public class MQHandler {private static Logger logger = Logger.getLogger(MQHandler.class);public static void getMQ(String broker_url, String target) {Connection connection = null;Session session = null;try {// 创建链接工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker_url);// 通过工厂创建一个连接connection = factory.createConnection();// factory.createConnection(userName, password)// 启动连接connection.start();// 第一个参数表示是否使用事务,第二个参数指定消息的确认模式session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(target);MessageConsumer consumer = session.createConsumer(topic);// 消费者异步接收topic里的消息consumer.setMessageListener(new MessageListener() {public void onMessage(Message msg) {try {// cms里发送的消息为BytesMessage,此处不做判断亦可if (msg instanceof BytesMessage) {BytesMessage bytesMessage = (BytesMessage) msg;long length = bytesMessage.getBodyLength();byte[] bt = new byte[(int) length];// 将BytesMessage转换为byte类型bytesMessage.readBytes(bt);// 壳文件字段,EventDis类为event_dis.proto文件解析而来,CommEventLog类为事件壳文件类EventDis.CommEventLog parseFrom = EventDis.CommEventLog.parseFrom(bt);// 输出壳文件字段logger.info("1."+parseFrom.toString());//System.out.println("1."+parseFrom.toString());int state = 0;if(parseFrom.getEventType()==524547){state = 1;}// 扩展字段,此字段为设备上报事件内容,部分事件需要使用pb文件再次解析ByteString extInfo = parseFrom.getExtInfo();//System.out.println("2."+extInfo.toStringUtf8());PmsEvent.MsgPmsEvent pms_ext_info = PmsEvent.MsgPmsEvent.parseFrom(extInfo);String place_syscode = pms_ext_info.getPgsEvent().getPlaceSyscode();// 输出扩展字段//logger.info("返回数据:"+pms_ext_info);//System.out.println(extInfo.toStringUtf8());Object o = Utils.findValue(Eventbus.PLOT, place_syscode);if(!Utils.isEmpty(o)){String plotNo = o.toString();MsgController.getInstance().sendMQ(plotNo,state);}}} catch (Exception e) {e.printStackTrace();}}});} catch (Exception e) {e.printStackTrace();}}}

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。