首页 > 编程知识 正文

mqtt消息订阅有延迟,mqtt服务器处理自定义数据

时间:2023-05-03 14:23:09 阅读:260169 作者:3495

Retained消息

参考:https://www.emqx.com/zh/blog/mqtt5-features-retain-message

nrdyl为一个topic设置了Retained消息,那么这个消息会保留在Broker里面,当有新的订阅者订阅到此topic之后,就会接收到这个Retained消息(也就是payload)

保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话终结,保留消息也不会被删除。

一个Topic只能有一个Retained消息,后设置的会覆盖前面设置的。

在python中对应的方法是:

matt.publish("test", payload="online", qos=0, retain=True)

删除Retained的方法是

client.publish("test", payload=None, qos=0, retain=True)

MQTT 遗嘱消息也称为Last Will and Testament

在python中设置遗嘱消息的方法是:

mqtt_client.will_set('test', "offline", qos=0, retain=True) # 即是遗嘱也是Retained

mqtt_client.will_set('test', "offline", qos=0, retain=False) # 只是遗嘱

如果设置遗嘱,retain为False的时候,新的client订阅是不会收到遗嘱消息的。

如果retain设置为True,他既是遗嘱也是Retained消息,意味着心的client连接也能收到消息。

如果设置了retain为False的遗嘱消息,清除可以使用下面的方式清除

mqtt_client.will_clear()

retain为True的时候

matt.publish("test", payload=None, qos=0, retain=True)

通过Retained报文和遗嘱消息,我们可以解决实际开发中其中一个问题,客户端上线下线状态维护显示。

第一步:设置遗嘱消息为设备下线的对应报文,并且设置retain=True来保证新的客户端也能每次订阅接收到此报文。

当然,单纯设置retain=True的遗嘱消息还不行,会导致在线都会发送遗嘱消息报文。

第二步:在客户端连接Broker成功之后,更改遗Retained消息为上线的报文

注意:遗嘱报文是连接的时候就会帮你自动发送给Broker,当下线时,由Broker自动往该主题发送。

下面是实现上线下线遗嘱+Retained实现。

import paho.mqtt.client as mqtt

# Retained消息的能够让其他订阅者在一订阅的时候收到这条消息

def on_connect(client, userdata, flags, rc):

if rc == 0:

client.publish("test", payload='{"status": "online"}', qos=0, retain=True) 清除retained报文

# client.publish("test", payload=None, qos=0, retain=True)

pass

else:

print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")

# 遗嘱消息能在本客户端断开的时候,给对应的订阅了次主题的订阅者发送遗嘱消息

mqtt_client.will_set('test', payload='{"status": "offline"}', qos=0, retain=False)

mqtt_client.on_connect = on_connect

# mqtt_client.will_clear()

mqtt_client.connect("192.168.1.55", 1883)

mqtt_client.loop_forever()

下面代码retain为False,当客户端断开连接会发送遗嘱,已有的订阅者可以接收到,但是新的订阅不会收到遗嘱

import paho.mqtt.client as mqtt

# Retained消息的能够让其他订阅者在一订阅的时候收到这条消息

def on_connect(client, userdata, flags, rc):

if rc == 0:

# client.publish("test", payload='{"status": "online"}', qos=0, retain=True) 清除retained报文

# client.publish("test", payload=None, qos=0, retain=True)

pass

else:

print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")

# 遗嘱消息能在本客户端断开的时候,给对应的订阅了次主题的订阅者发送遗嘱消息

mqtt_client.will_set('test', payload='{"status": "offline"}', qos=0, retain=False)

mqtt_client.on_connect = on_connect

# mqtt_client.will_clear()

mqtt_client.connect("192.168.1.55", 1883)

mqtt_client.loop_forever()

QOS

QoS是Sender和Receiver之间的协议,而不是Publisher和Subscriber之间的协议。换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscibe的时候和Broker协商的QoS等级。

QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。

QoS 1:消息传递至少 1 次。

QoS 2:消息仅传送一次。

QOS实际是客户端和Broker之间的一个服务可靠等级。这个可靠等级最终计算也是取决于订阅和发布双方。

实际的client和broker的QOS是MIN(Publish QoS, Subscribe QoS)。假设现在发布方的主题是2,但是订阅方的主题是0,那么你订阅方最终和Broker之间的QOS也是0。假设发布方的主题是0,订阅方订阅了设置为了1,那最终订阅方和Broker之间也是0.

当A向主题test发送了一个QOS为1的报文,那么A客户端(sdk写好此实现)就需要保证Broker收到了这条消息之后至少一次再丢弃发送报文,如果一直没有收到Broker的成功回复,就一直发送。

下面我们通过代码说明此QOS

import paho.mqtt.client as mqtt

# Retained消息的能够让其他订阅者在一订阅的时候收到这条消息

def on_connect(client, userdata, flags, rc):

if rc == 0:

client.publish("test", payload='online', qos=0, retain=True)

pass

else:

print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")

# 遗嘱消息能在本客户端断开的时候,给对应的订阅了次主题的订阅者发送遗嘱消息

# mqtt_client.will_set('test', None, qos=0, retain=True)

mqtt_client.on_connect = on_connect

mqtt_client.will_clear()

mqtt_client.connect("192.168.1.55", 1883)

mqtt_client.loop_forever()

client.publish("test", payload='online', qos=0, retain=True)发布了一个QOS等级为0的。

然后我们以QOS2的方式订阅,看收到的消息是QOS几:

image.png

如果我们改为2 client.publish("test", payload='online', qos=2, retain=True)

收到的消息就是QOS2:

image.png

publish为2, sub为1, 订阅方实际的qos得到的是1

image.png

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。