首页 > 编程知识 正文

mqtt协议应用,基于mqtt协议的数据采集

时间:2023-05-04 01:44:26 阅读:189024 作者:3249

因公司项目需要,最近接入了阿里 MQTT 协议,今天简单写一下如何简单使用:

一、首先是认证,这里的认证方式有动态注册、ID2认证方式和三元组认证,我们使用的是三元组认证;

1、首先根据阿里云官方文档进行配置,这里不详细介绍了:

https://help.aliyun.com/document_detail/96607.html?spm=a2c4g.11186623.6.615.a6922b0aSxaZOK

2、获取四个参数,分别是 productKey、deviceName、deviceSecret、productSecret  ;

// SDK初始化InitManager.init(this, productKey, deviceName, deviceSecret, productSecret, new IDemoCallback() { @Override public void onError(AError aError) { // 初始化失败,初始化失败之后需要用户负责重新初始化 // 如一开始网络不通导致初始化失败,后续网络回复之后需要重新初始化 Log.e("","初始化失败"); } @Override public void onInitDone(Object data) { Log.e("","初始化成功"); // isInitDone = true; }}); public interface IDemoCallback extends ILinkKitConnectListener{}

二、认证成功后,通过 RRPC 进行订阅

String testTopic = "/ext/rrpc/" + productKey + "/" + deviceName + "/user/get";final MqttRrpcRegisterRequest registerRequest = new MqttRrpcRegisterRequest();registerRequest.topic = testTopic;LinkKit.getInstance().subscribeRRPC(registerRequest, new IConnectRrpcListener() { @Override public void onSubscribeSuccess(ARequest aRequest) { Log.e("","订阅成功"); // 订阅成功 request = aRequest; } @Override public void onSubscribeFailed(ARequest aRequest, AError aError) { // 订阅失败 Log.e("","订阅失败"); } @Override public void onReceived(ARequest aRequest, IConnectRrpcHandle iConnectRrpcHandle) { // 收到云端下行 Log.e("","收到下行数据"); // 这里根据个人情况来设置 // 如果不一定是json格式,可以参考如下方式回复 MqttPublishRequest rrpcResponse = new MqttPublishRequest(); if (aRequest instanceof MqttRrpcRequest) { rrpcResponse.topic = ((MqttRrpcRequest) aRequest).topic; } rrpcResponse.payloadObj ="{"id":"123", "code":"200"" + ","data":{} }"; LinkKit.getInstance().publish(rrpcResponse, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { ALog.d("", "onResponse() called with: aRequest = [" + aRequest + "], aResponse = [" + aResponse + "]"); } @Override public void onFailure(ARequest aRequest, AError aError) { ALog.d("", "onFailure() called with: aRequest = [" + aRequest + "], aError = [" + aError + "]"); } }); } @Override public void onResponseSuccess(ARequest aRequest) { Log.e("","响应成功"); // RRPC 响应成功 } @Override public void onResponseFailed(ARequest aRequest, AError aError) { Log.e("","响应失败"); // RRPC 响应失败 }});

三、创建消息接收 Notify,并且在onResume()注册,在onPause() 取消;

private IConnectNotifyListener notifyListener = new IConnectNotifyListener() { @Override public void onNotify(String s, String s1, AMessage aMessage) { Log.e("收到下行消息 topic=" ,s1); String downstreamData = new String((byte[]) aMessage.data); Log.e("downstreamData",downstreamData); } @Override public boolean shouldHandle(String s, String s1) { Log.e("",s1); return true; } @Override public void onConnectStateChange(String s, ConnectState connectState) { Log.e("",s); }}; @Overrideprotected void onResume() { super.onResume(); LinkKit.getInstance().registerOnPushListener(notifyListener);} @Overrideprotected void onPause() { super.onPause(); LinkKit.getInstance().unRegisterOnPushListener(notifyListener); }

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。