亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

基于python實(shí)現(xiàn)MQTT發(fā)布訂閱過(guò)程原理解析

瀏覽:2日期:2022-07-16 11:31:46

MQTT簡(jiǎn)介

MQTT 全稱為 Message Queuing Telemetry Transport(消息隊(duì)列遙測(cè)傳輸)是一種基于發(fā)布/訂閱范式的“輕量級(jí)”消息協(xié)議。該協(xié)議構(gòu)建于TCP/IP協(xié)議上。

MQTT協(xié)議是輕量、簡(jiǎn)單、開放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。

其在,通過(guò)衛(wèi)星鏈路通信傳感器、偶爾撥號(hào)的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中已廣泛使用。

MQTT特點(diǎn)

1、使用發(fā)布/訂閱消息模式,提供一對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合。該協(xié)議需要客戶端和服務(wù)端,而協(xié)議中主要有三種身份:發(fā)布者(Publisher)、代理(Broker,服務(wù)器)、訂閱者(Subscriber)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,而消息發(fā)布者可以同時(shí)是訂閱者,實(shí)現(xiàn)了生產(chǎn)者與消費(fèi)者的脫耦;

2、對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸;

3、使用 TCP/IP 提供網(wǎng)絡(luò)連接;

4、有三種消息發(fā)布服務(wù)質(zhì)量:

“至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這一級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無(wú)所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。 “至少一次”,確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。 “只有一次”,確保消息到達(dá)一次。這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。

5、小型傳輸,開銷很小(固定長(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量;

6、使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端異常中斷的機(jī)制。

原理

基于python實(shí)現(xiàn)MQTT發(fā)布訂閱過(guò)程原理解析

MQTT協(xié)議有三種身份:發(fā)布者、代理、訂閱者,發(fā)布者和訂閱者都為客戶端,代理為服務(wù)器,同時(shí)消息的發(fā)布者也可以是訂閱者(為了節(jié)約內(nèi)存和流量發(fā)布者和訂閱者一般都會(huì)定義在一起)。

MQTT傳輸?shù)南⒎譃橹黝}(Topic,可理解為消息的類型,訂閱者訂閱后,就會(huì)收到該主題的消息內(nèi)容(payload))和負(fù)載(payload,可以理解為消息的內(nèi)容)兩部分。

1.MQTT協(xié)議實(shí)現(xiàn)方式

實(shí)現(xiàn)MQTT協(xié)議需要客戶端和服務(wù)器端通訊完成,在通訊過(guò)程中,MQTT協(xié)議中有三種身份:發(fā)布者(Publish)、代理(Broker)(服務(wù)器)、訂閱者(Subscribe)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,消息發(fā)布者可以同時(shí)是訂閱者。

MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負(fù)載(payload)兩部分:

(1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會(huì)收到該主題的消息內(nèi)容(payload);

(2)payload,可以理解為消息的內(nèi)容,是指訂閱者具體要使用的內(nèi)容。

2.網(wǎng)絡(luò)傳輸與應(yīng)用消息

MQTT會(huì)構(gòu)建底層網(wǎng)絡(luò)傳輸:它將建立客戶端到服務(wù)器的連接,提供兩者之間的一個(gè)有序的、無(wú)損的、基于字節(jié)流的雙向傳輸。

當(dāng)應(yīng)用數(shù)據(jù)通過(guò)MQTT網(wǎng)絡(luò)發(fā)送時(shí),MQTT會(huì)把與之相關(guān)的服務(wù)質(zhì)量(QoS)和主題名(Topic)相關(guān)連。

3.MQTT客戶端

一個(gè)使用MQTT協(xié)議的應(yīng)用程序或者設(shè)備,它總是建立到服務(wù)器的網(wǎng)絡(luò)連接。客戶端可以:

(1)發(fā)布其他客戶端可能會(huì)訂閱的信息;

(2)訂閱其它客戶端發(fā)布的消息;

(3)退訂或刪除應(yīng)用程序的消息;

(4)斷開與服務(wù)器連接。

4.MQTT服務(wù)端

MQTT服務(wù)器以稱為'消息代理'(Broker),可以是一個(gè)應(yīng)用程序或一臺(tái)設(shè)備。它是位于消息發(fā)布者和訂閱者之間,它可以:

(1)接受來(lái)自客戶的網(wǎng)絡(luò)連接;

(2)接受客戶發(fā)布的應(yīng)用信息;

(3)處理來(lái)自客戶端的訂閱和退訂請(qǐng)求;

(4)向訂閱的客戶轉(zhuǎn)發(fā)應(yīng)用程序消息。

5.MQTT協(xié)議中的訂閱、主題、會(huì)話

一、訂閱(Subscription)

訂閱包含主題篩選器(Topic Filter)和最大服務(wù)質(zhì)量(QoS)。訂閱會(huì)與一個(gè)會(huì)話(Session)關(guān)聯(lián)。一個(gè)會(huì)話可以包含多個(gè)訂閱。每一個(gè)會(huì)話中的每個(gè)訂閱都有一個(gè)不同的主題篩選器。

二、會(huì)話(Session)

每個(gè)客戶端與服務(wù)器建立連接后就是一個(gè)會(huì)話,客戶端和服務(wù)器之間有狀態(tài)交互。會(huì)話存在于一個(gè)網(wǎng)絡(luò)之間,也可能在客戶端和服務(wù)器之間跨越多個(gè)連續(xù)的網(wǎng)絡(luò)連接。

三、主題名(Topic Name)

連接到一個(gè)應(yīng)用程序消息的標(biāo)簽,該標(biāo)簽與服務(wù)器的訂閱相匹配。服務(wù)器會(huì)將消息發(fā)送給訂閱所匹配標(biāo)簽的每個(gè)客戶端。

四、主題篩選器(Topic Filter)

一個(gè)對(duì)主題名通配符篩選器,在訂閱表達(dá)式中使用,表示訂閱所匹配到的多個(gè)主題。

五、負(fù)載(Payload)

消息訂閱者所具體接收的內(nèi)容。

6.MQTT協(xié)議中的方法

MQTT協(xié)議中定義了一些方法(也被稱為動(dòng)作),來(lái)于表示對(duì)確定資源所進(jìn)行操作。這個(gè)資源可以代表預(yù)先存在的數(shù)據(jù)或動(dòng)態(tài)生成數(shù)據(jù),這取決于服務(wù)器的實(shí)現(xiàn)。通常來(lái)說(shuō),資源指服務(wù)器上的文件或輸出。主要方法有:

(1)Connect。等待與服務(wù)器建立連接。

(2)Disconnect。等待MQTT客戶端完成所做的工作,并與服務(wù)器斷開TCP/IP會(huì)話。

(3)Subscribe。等待完成訂閱。

(4)UnSubscribe。等待服務(wù)器取消客戶端的一個(gè)或多個(gè)topics訂閱。

(5)Publish。MQTT客戶端發(fā)送消息請(qǐng)求,發(fā)送完成后返回應(yīng)用程序線程。

7.應(yīng)用場(chǎng)景

因?yàn)樗鼈鬏斚⒕哂挟惒叫裕òl(fā)布訂閱模式),同時(shí)該協(xié)議本身的輕量特定,因此可用于輕量級(jí)應(yīng)用,

可作為物聯(lián)網(wǎng)的通信組件使用,例如樹莓派上完全可以搭建一個(gè)mqtt服務(wù)器,當(dāng)未來(lái)智能家居全面普及的時(shí)候,

家居中的消息通訊都可用此實(shí)現(xiàn),如智能冰箱溫度檢測(cè),房間溫度檢測(cè)等信息都能通過(guò)mqtt去實(shí)現(xiàn),遙感數(shù)據(jù)、

汽車檢測(cè)數(shù)據(jù)、智能家居、智慧城市、醫(yī)療醫(yī)護(hù)都具有應(yīng)用場(chǎng)景。

客戶端

#!/usr/bin/env python # encoding: utf-8 ''' @version: v1.0 @author: W_H_J @license: Apache Licence @contact: [email protected] @software: PyCharm @file: clicentMqttTest.py @time: 2019/2/22 14:19 @describe: mqtt客戶端'''import jsonimport sysimport osimport paho.mqtt.client as mqttimport time sys.path.append(os.path.abspath(os.path.dirname(__file__) + ’/’ + ’..’))sys.path.append('..') TASK_TOPIC = ’test’ # 客戶端發(fā)布消息主題 client_id = time.strftime(’%Y%m%d%H%M%S’, time.localtime(time.time()))'''client_id是連接到代理。如果client_id的長(zhǎng)度為零或?yàn)榱悖瑒t行為為由使用的協(xié)議版本定義。如果使用MQTT v3.1.1,那么一個(gè)零長(zhǎng)度的客戶機(jī)id將被發(fā)送到代理,代理將被發(fā)送為客戶端生成一個(gè)隨機(jī)變量。如果使用MQTT v3.1,那么id將是隨機(jī)生成的。在這兩種情況下,clean_session都必須為True。如果這在這種情況下不會(huì)產(chǎn)生ValueError。注意:一般情況下如果客戶端服務(wù)端啟用兩個(gè)監(jiān)聽那么客戶端client_id 不能與服務(wù)器相同,如這里用時(shí)間'20190222142358'作為它的id,如果與服務(wù)器id相同,則無(wú)法接收到消息'''client = mqtt.Client(client_id, transport=’tcp’) client.connect('127.0.0.1', 1883, 60) # 此處端口默認(rèn)為1883,通信端口期keepalive默認(rèn)60client.loop_start() def clicent_main(message: str): ''' 客戶端發(fā)布消息 :param message: 消息主體 :return: ''' time_now = time.strftime(’%Y-%m-%d %H-%M-%S’, time.localtime(time.time())) payload = {'msg': '%s' % message, 'data': '%s' % time_now} # publish(主題:Topic; 消息內(nèi)容) client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False)) print('Successful send message!') return Trueif __name__ == ’__main__’: msg = '我是一條測(cè)試數(shù)據(jù)!' clicent_main(msg)client

服務(wù)端

#!/usr/bin/env python # encoding: utf-8 ''' @version: v1.0 @author: W_H_J @license: Apache Licence @contact: [email protected] @software: PyCharm @file: serverMqttTest.py @time: 2019/2/22 14:35 @describe: mqtt 服務(wù)端'''import jsonimport sysimport osimport timeimport paho.mqtt.client as mqttsys.path.append(os.path.abspath(os.path.dirname(__file__) + ’/’ + ’..’))sys.path.append('..') REPORT_TOPIC = ’test’ # 主題 def on_connect(client, userdata, flags, rc): print(’connected to mqtt with resurt code ’, rc) client.subscribe(REPORT_TOPIC) # 訂閱主題 def on_message(client, userdata, msg): ''' 接收客戶端發(fā)送的消息 :param client: 連接信息 :param userdata: :param msg: 客戶端返回的消息 :return: ''' print('Start server!') payload = json.loads(msg.payload.decode(’utf-8’)) print(payload) def server_conenet(client): client.on_connect = on_connect # 啟用訂閱模式 client.on_message = on_message # 接收消息 client.connect('127.0.0.1', 1883, 60) # 鏈接 # client.loop_start() # 以start方式運(yùn)行,需要啟動(dòng)一個(gè)守護(hù)線程,讓服務(wù)端運(yùn)行,否則會(huì)隨主線程死亡 client.loop_forever() # 以forever方式阻塞運(yùn)行。 def server_stop(client): client.loop_stop() # 停止服務(wù)端 sys.exit(0) def server_main(): client_id = time.strftime(’%Y%m%d%H%M%S’, time.localtime(time.time())) client = mqtt.Client(client_id, transport=’tcp’) server_conenet(client) if __name__ == ’__main__’: # 啟動(dòng)監(jiān)聽 server_main()server

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 手机看片自拍自拍自拍 | 的九一视频入口在线观看 | 999久久免费高清热精品 | 亚洲一区www | 亚洲 欧美 精品 中文第三 | 成人精品视频 | 久久鸭综合久久国产 | 久久国产精品久久精品国产 | 激情视频网站在线观看 | 香蕉九九 | 一级做a爱过程免费视频日本 | 久久一区二区明星换脸 | 被黑人操视频 | 国产黄色在线看 | 明星换脸高清一区二区 | 青草视频入口 在线观看 | 亚洲国产精品综合久久2007 | 五月婷婷六月天 | 五月天婷婷视频 | 手机看片久久国产免费不卡 | 欧美成人精品不卡视频在线观看 | 国产182ty| 免费中文字幕乱码电影麻豆网 | 国产欧美亚洲精品a | 欧美亚洲精品在线 | 手机看片国产免费现在观看 | 免费看欧美一级a毛片 | 久久久久久久久免费视频 | 亚洲中字幕永久在线观看 | 麻豆精品久久精品色综合 | 日韩最新中文字幕 | 免费区欧美一级毛片 | 国产精品麻豆视频 | 蜜臀αv | 久久亚洲综合 | 美女视频一区二区 | 国产福利在线观看第二区 | 真实国产精品视频国产网 | 华人黄网站大全 | 天干天干夜天干天天爽 | 奇米中文字幕 |