在IoT场景中,我们业务服务器会有实时获取设备上报的消息,感知设备上下线状态变化的需求,通过配置服务端订阅将数据转到消息服务MNS队列,然后从MNS队列中即可获取这些数据。
阿里云IoT物联网套件配合消息服务MNS来提供服务端订阅设备消息的功能,这种方式优势是MNS可以保证消息的可靠性,避免了服务端不可用时的消息丢失,同时MNS在处理大量消息并发时有削峰填谷的作用,保证服务端不会因为突然的并发压力导致服务不可用。这篇文档主要讲述的就是当设备的数据发送到物联网套件之后,用户的服务端如何获取设备的数据。
1.阿里云IoT控制台配置服务端订阅
阿里云IoT物联网套件开通 https://www.aliyun.com/product/iot
我们在阿里云IoT控制台,创建产品:空气检测,选择基础版。
1.2 在产品下添加设备
我们在阿里云IoT控制台,设备管理里空气检测产品下添加一个具体设备。
1.2 在产品下配置服务端订阅
我们在阿里云IoT控制台,为空气检测产品开通服务端订阅,勾选设备上报消息和设备状态变化通知,开通后会有MNS的区域:cn-shanghai和队列信息:aliyun-iot-a1jnUEKYhw4,如下。
通过阅读阿里云IoT文档,我们了解到队列中消息结构体如下:
{ "payload": "Base64 Encode的数据", "messagetype": "status", "messageid": 996000000000000001, "topic": "具体的设备Topic", "timestamp": 1526450324}messageid:IoT套件生成的消息ID
messagetype:指的是消息类型:设备状态status设备上报消息upload**
topic:表示该消息源自套件中的哪个topic,当messageType=status时,topic为null,当messageType=upload时,topic为具体的设备Topic
payload:数据为Base64 Encode的数据。当messageType=status时,数据是设备状态数据;当messageType=upload时,data即为设备发布到Topic中的原始数据。
timestamp:队列中消息生成时间戳,并非业务的时间戳
2.设备端开发
我们采用nodejs脚本模拟设备,与IoT云端建立连接,上报数据。
package.json中添加npm依赖"aliyun-iot-mqtt": "0.0.4"模块
2.2 编写设备端应用程序代码
我们需要在控制台获取设备身份三元组:productKey,deviceName,deviceSecret,以及产品分区regionId
/*** package.json 添加依赖:"aliyun-iot-mqtt": "0.0.4"*/const mqtt = require('aliyun-iot-mqtt');//设备三元组const options = { productKey: "产品", deviceName: "设备", deviceSecret: "秘钥", regionId: "cn-shanghai"};//设备与云 建立连接,设备上线const client = mqtt.getAliyunIotMqttClient(options);//主题topicconst topic = `${options.productKey}/${options.deviceName}/update`;const data = { temperature: Math.floor((Math.random()*20)+10), humidity: Math.floor((Math.random()*100)+20),};//指定topic发布数据到云端client.publish(topic, JSON.stringify(data));2.3 启动模拟设备脚本 $node iot-mns.js
脚本执行后,我们在IoT云端控制台产品-日志服务里查看设备行为分析日志,根据时间顺序,我们看到
首先在10:53:05时,设备建立连接,上线;
然后在10:53:12时,设备断开连接,下线。
查看设备上行消息分析日志,根据时间顺序,我们看到
首先设备publish message,
然后流转到RuleEngine规则引擎,
最终Transmit MNS的消息队列aliyun-iot-a1jnUEKYhw4
我们切换到MNS控制台,选择华东2区域,可以看到消息队列aliyun-iot-a1jnUEKYhw4有3条活跃消息
3 消费队列中设备消息
3.1 消息服务MNS使用
我们以java开发为例,pom中添加依赖aliyun-sdk-mns,httpasyncclient,fastjson,如下:
<dependencies><dependency><groupId>com.aliyun.mns</groupId><artifactId>aliyun-sdk-mns</artifactId><version>1.1.8</version><classifier>jar-with-dependencies</classifier></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpasyncclient</artifactId><version>4.0.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.42</version></dependency></dependencies>我们通过mns的sdk,创建连接,轮询获取队列消息。为了方便阅读,我们对消息的payload数据做base64解码,完整应用程序代码如下:
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.aliyun.mns.client.CloudAccount;import com.aliyun.mns.client.CloudQueue;import com.aliyun.mns.client.MNSClient;import com.aliyun.mns.model.Message;import org.apache.commons.codec.binary.Base64;public class ComsumerDemo { public static String accessKeyId = "AK"; public static String accessKeySecret = "AK秘钥"; public static String endpoint = "mns公网Endpoint"; public static void main(String[] args) { CloudAccount account = new CloudAccount(accessKeyId,accessKeySecret,endpoint); MNSClient client = account.getMNSClient(); //获取消息队列 CloudQueue queue = client.getQueueRef("aliyun-iot-a1jnUEKYhw4"); while (true) { Message popMsg = queue.popMessage(10); if (popMsg != null) { System.out.println("message id: " + popMsg.getMessageId()); System.out.println("message body: n" + decodeBase64(popMsg.getMessageBodyAsString())); //删除消息 queue.deleteMessage(popMsg.getReceiptHandle()); } } } public static String decodeBase64(String jsonString) { try { JSONObject json = JSON.parseObject(jsonString); String payload = new String(Base64.decodeBase64(json.getString("payload"))); json.put("payload", JSON.parseObject(payload)); return json.toJSONString(); } catch (Exception e) { e.printStackTrace(); } return null; }}3.2 运行程序
控制台输出如下,我们看到
timestamp=1526450324时,设备上线消息,
timestamp=1526450334时,设备上报了数据,通过payload可以看到完整数据
timestamp=1526450353时,设备下线消息
message id: E2CF179AD5686386-2-16367878417-200000009message body: { "payload": { "lastTime": "2018-05-16 13:58:44.413", "clientIp": "42.120.74.246", "time": "2018-05-16 13:58:44.427", "productKey": "a1jnUEKYhw4", "deviceName": "suw8umOqgJ72yCADerZp", "status": "online" }, "messagetype": "status", "messageid": 996631012001751041, "timestamp": 1526450324}message id: "656A4F66B391367-1-1636787AAC0-20000000C"message body: { "payload": { "temperature": 14, "humidity": 116 }, "messagetype": "upload", "topic": "/a1jnUEKYhw4/suw8umOqgJ72yCADerZp/update", "messageid": 996631053735047169, "timestamp": 1526450334}message id: E2CF179AD5686386-2-1636787F5F1-20000000Amessage body: { "payload": { "lastTime": "2018-05-16 13:59:04.381", "time": "2018-05-16 13:59:13.571", "productKey": "a1jnUEKYhw4", "deviceName": "suw8umOqgJ72yCADerZp", "status": "offline" }, "messagetype": "status", "messageid": 996631134240534528, "timestamp": 1526450353}4. 源代码
Github 传送门:https://github.com/iot-blog/aliyun-iot-mns
IoT物联网技术