首页 > 编程知识 正文

springboot消息推送,mqtt开源框架netty

时间:2023-05-05 17:48:14 阅读:156686 作者:1148

Springboot集成的mosquito MQTT服务

实现消息的订阅和发布。

pom为dependencygroupidorg.spring framework.integration/groupidartifactidspring-integration-stream/artifact id/ependencydependencygroupidorg.spring framework.integration/groupidartifactidspring-integration-mqtt/artifact id/did

springmqtt 3360 username : admin password : adminurl : TCP :/10.9.25.2033601883 client 3360 id : ' MQ ttid1' deded

package mqtt; importorg.eclipse.PAHO.client.MQ ttv3. mqttconnectoptions; importorg.spring framework.beans.factory.annotation.value; importorg.spring framework.context.annotation.bean; importorg.spring framework.context.annotation.configuration; importorg.spring framework.integration.annotation.integrationcomponentscan; importorg.spring framework.integration.annotation.service activator; importorg.spring framework.integration.channel.direct channel; importorg.spring framework.integration.core.message producer; importorg.spring framework.integration.mqtt.core.defaultmqttpahoclientfactory; importorg.spring framework.integration.mqtt.core.mqttpahoclientfactory; importorg.spring framework.integration.mqtt.inbound.mqttpahomessagedrivenchanneladapter; importorg.spring framework.integration.mqtt.outbound.mqttpahomessagehandler; importorg.spring framework.integration.mqtt.support.defaultpahomessageconverter; importorg.spring framework.messaging.message; importorg.spring framework.messaging.message channel; importorg.spring framework.messaging.message handler; importorg.spring framework.messaging.jjdlz; import java.util.Arrays; import java.util.List; @ configuration @ integrationcomponentscanpublicclassmqttsenderconfig { @ value { ' $ { spring.mqtt.username } ' }权限@value('${spring.mqtt.URL} ) private String hostUrl; @ value (' $ { spring.mqtt.client.id } ) ) private String clientId; @ value (' $ { spring.mqtt.default.topic } ) private String defaultTopic; @ value (' # { ' $ { spring.mqtt.topics } '.split ),') private ListStr

ing> topics ; @Value("#{'${spring.mqtt.qosValues}'.split(',')}") private List<Integer> qosValues; @Bean public MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();// mqttConnectOptions.setUserName(username);// mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); // 设置超时时间 单位为秒 mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setMaxInflight(100000000); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,监听的topic @Bean public MessageProducer inbound() { String[] strings = new String[topics.size()]; String[] strings1 = {"topic","hello"}; Integer[] ints = new Integer[qosValues.size()]; topics.toArray(strings); qosValues.toArray(ints); System.out.println("strings=="+strings); int[] its= Arrays.stream(ints).mapToInt(Integer::valueOf).toArray(); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),strings); adapter.setCompletionTimeout(3000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws jjdlz { Object hello = message.getHeaders().get("mqtt_receivedTopic"); String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); Object payload = message.getPayload(); System.out.println("msg=="+payload); System.out.println("zhuti topic"+topic);// String topic = message.getHeaders().get("mqtt_receivedTopic").toString();// String type = topic.substring(topic.lastIndexOf("/")+1, topic.length()); // System.out.println(topic+"|"+message.getPayload().toString()); } }; }}

发送消息接口

package mqtt;import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway { void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);} package mqtt;import com.ruoyi.framework.mqtt.MqttGateway;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController@RequestMapping("/test")public class TestWwController { @Resource private MqttGateway mqttGateway; @RequestMapping("/sendMqtt") public String sendMqtt(){ String sendData = "12356"; System.out.println("消息订阅"+sendData); mqttGateway.sendToMqtt(sendData,"hello"); return "OK"; } @RequestMapping("/test") public String test(String sendData){ return "testOK"; }}

资源参考   https://www.cnblogs.com/victorbu/p/11978107.html

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。