亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術(shù)文章
文章詳情頁

詳解Python 實(shí)現(xiàn) ZeroMQ 的三種基本工作模式

瀏覽:33日期:2022-08-01 11:53:10

簡介

引用官方說法:ZMQ(以下 ZeroMQ 簡稱 ZMQ)是一個(gè)簡單好用的傳輸層,像框架一樣的一個(gè) socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。

是一個(gè)消息處理隊(duì)列庫,可在多個(gè)線程、內(nèi)核和主機(jī)盒之間彈性伸縮。

ZMQ 的明確目標(biāo)是“成為標(biāo)準(zhǔn)網(wǎng)絡(luò)協(xié)議棧的一部分,之后進(jìn)入 Linux 內(nèi)核”。現(xiàn)在還未看到它們的成功。但是,它無疑是極具前景的、并且是人們更加需要的“傳統(tǒng)” BSD 套接字之上的一 層封裝。ZMQ 讓編寫高性能網(wǎng)絡(luò)應(yīng)用程序極為簡單和有趣。

它跟 RabbitMQ,ActiveMQ 之類有著相當(dāng)本質(zhì)的區(qū)別,ZeroMQ 根本就不是一個(gè)消息隊(duì)列服務(wù)器,更像是一組底層網(wǎng)絡(luò)通訊庫,對原有的 Socket API 加上一層封裝,使我們操作更簡便。

三種工作模式

Request-Reply 模式:

說到“請求-應(yīng)答”模式,不得不說的就是它的消息流動(dòng)模型。消息流動(dòng)模型指的是該模式下,必須嚴(yán)格遵守“一問一答”的方式。

發(fā)出消息后,若沒有收到回復(fù),再發(fā)出第二條消息時(shí)就會(huì)拋出異常。同樣的,對于 Rep 也是,在沒有接收到消息前,不允許發(fā)出消息。

基于此構(gòu)成“一問一答”的響應(yīng)模式。

server:

# -*- coding=utf-8 -*-import zmqcontext = zmq.Context()socket = context.socket(zmq.REP)socket.bind('tcp://*:5555')while True: message = socket.recv() print('Received: %s' % message) socket.send('I am OK!')

client:

# -*- coding=utf-8 -*-import zmqcontext = zmq.Context()socket = context.socket(zmq.REQ)socket.connect('tcp://localhost:5555')socket.send(’Are you OK?’)response = socket.recv()print('response: %s' % response)

Publish-Subscribe 模式:

“發(fā)布-訂閱”模式下,“發(fā)布者”綁定一個(gè)指定的地址,例如“192.168.10.1:5500”,“訂閱者”連接到該地址。該模式下消息流是單向的,只允許從“發(fā)布者”流向“訂閱者”。且“發(fā)布者”只管發(fā)消息,不理會(huì)是否存在“訂閱者”。一個(gè)“發(fā)布者”可以擁有多個(gè)訂閱者,同樣的,一個(gè)“訂閱者”也可訂閱多個(gè)發(fā)布者。

雖然我們知道“發(fā)布者”在發(fā)送消息時(shí)是不關(guān)心“訂閱者”的存在于否,所以先啟動(dòng)“發(fā)布者”,再啟動(dòng)“訂閱者”是很容易導(dǎo)致部分消息丟失的。那么可能會(huì)提出一個(gè)說法“我先啟動(dòng)‘訂閱者’,再啟動(dòng)‘發(fā)布者’,就能解決這個(gè)問題了?”

對于 ZeroMQ 而言,這種做法也并不能保證 100% 的可靠性。在 ZeroMQ 領(lǐng)域中,有一個(gè)叫做“慢木匠”的術(shù)語,就是說即使我是先啟動(dòng)了“訂閱者”,再啟動(dòng)“發(fā)布者”,“訂閱者”總是會(huì)丟失第一批數(shù)據(jù)。因?yàn)樵凇坝嗛喺摺迸c端點(diǎn)建立 TCP 連接時(shí),會(huì)包含幾毫秒的握手時(shí)間,雖然時(shí)間短,但是是存在的。再加上 ZeroMQ 后臺(tái) IO 是以一部方式執(zhí)行的,所以若不在雙方之間施加同步策略,消息丟失是不可避免的。

關(guān)于“發(fā)布-訂閱”模式在 ZeroMQ 中的一些其他特點(diǎn):

公平排隊(duì),一個(gè)“訂閱者”連接到多個(gè)發(fā)布者時(shí),會(huì)均衡的從每個(gè)“發(fā)布者”讀取消息,不會(huì)出現(xiàn)一個(gè)“發(fā)布者”淹沒其他“發(fā)布者”的情況。 ZMQ3.0 以上的版本,過濾規(guī)則發(fā)生在“發(fā)布方”。 ZMQ3.0 以下的版本,過濾規(guī)則發(fā)生在“訂閱方”。其實(shí)也就是處理消息的位置。

server:

# -*- coding=utf-8 -*-import zmqimport timecontext = zmq.Context()socket = context.socket(zmq.PUB)socket.bind('tcp://*:5555')for i in range(10): print(’send message...’ + str(i)) socket.send(’message’ + str(i)) time.sleep(1)

client:

# -*- coding=utf-8 -*-import zmqcontext = zmq.Context()socket = context.socket(zmq.SUB)socket.connect('tcp://localhost:5555')socket.setsockopt(zmq.SUBSCRIBE, ’’)while True: response = socket.recv() print('response: %s' % response)

Parallel Pipeline 模式:

在說明“管道模式”前,需要明確的是在 ZeroMQ 中并沒有絕對的服務(wù)端與客戶端之分,所有的數(shù)據(jù)接收與發(fā)送都是以連接為單位的,只區(qū)分 ZeroMQ 定義的類型。就像套接字綁定地址時(shí),可以使用 bind ,也可以使用 connect ,只是通常我們將理解中的服務(wù)端 bind 到一個(gè)地址,而理解中的客戶端 connec 到該地址。

“管道模式”一般用于任務(wù)分發(fā)與結(jié)果收集,由一個(gè)任務(wù)發(fā)生器來產(chǎn)生任務(wù),“公平”的派發(fā)到其管轄下的所有 worker,完成后再由結(jié)果收集器來回收任務(wù)的執(zhí)行結(jié)果。

整體流程比較好理解,worker 連接到任務(wù)發(fā)生器上,等待任務(wù)的產(chǎn)生,完成后將結(jié)果發(fā)送至結(jié)果收集器。如果要以客戶端服務(wù)端的概念來區(qū)分,這里的任務(wù)發(fā)生器與結(jié)果收集器是服務(wù)端,而 worker 是客戶端。

前面說到了這里任務(wù)的派發(fā)是“公平的”,因?yàn)閮?nèi)部采用了 LRU 的算法來找到最近最久未工作的閑置 worker。但是公平在這里是相對的,當(dāng)任務(wù)發(fā)生器啟動(dòng)后,第一個(gè)連接到它的 worker 會(huì)在一瞬間承受整個(gè)任務(wù)發(fā)生器產(chǎn)生的 tasks。

總結(jié)來說由三部分組成,push 進(jìn)行數(shù)據(jù)推送,work 進(jìn)行數(shù)據(jù)緩存,pull 進(jìn)行數(shù)據(jù)競爭獲取處理。區(qū)別于 Publish-Subscribe 存在一個(gè)數(shù)據(jù)緩存和處理負(fù)載。

當(dāng)連接被斷開,數(shù)據(jù)不會(huì)丟失,重連后數(shù)據(jù)繼續(xù)發(fā)送到對端。

server:

# -*- coding=utf-8 -*-import zmqimport timecontext = zmq.Context()socket = context.socket(zmq.PUSH)socket.bind('tcp://*:5557')for i in range(10): socket.send(’message’ + str(i)) # 沒啟 worker 時(shí)不會(huì)發(fā)消息 print(’send message...’ + str(i)) time.sleep(1)

work:

# -*- coding=utf-8 -*-import zmqcontext = zmq.Context()receive = context.socket(zmq.PULL)receive.connect(’tcp://127.0.0.1:5557’)sender = context.socket(zmq.PUSH)sender.connect(’tcp://127.0.0.1:5558’)while True: data = receive.recv() print(’transform...’ + data) sender.send(data)

client:

# -*- coding=utf-8 -*-import zmqcontext = zmq.Context()socket = context.socket(zmq.PULL)socket.bind('tcp://*:5558')while True: response = socket.recv() print('response: %s' % response)

以上。

參考文檔:

https://www.jb51.net/article/177043.htm

總結(jié)

到此這篇關(guān)于詳解Python 實(shí)現(xiàn) ZeroMQ 的三種基本工作模式的文章就介紹到這了,更多相關(guān)python ZeroMQ工作模式內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 91中文字幕在线观看 | 亚洲国产精品久久久久网站 | 黄色毛片小视频 | 九九精品影院 | 免费观看国产网址你懂的 | 天天影视色| 国产麻豆免费 | 在线免费观看日韩视频 | 久久99精品一久久久久久 | 久久国产综合 | 美女扒开胸罩露出奶了无遮挡免费 | 精品久久中文字幕 | 欧美黄色大片视频 | 成年轻人在线看片 | 亚洲欧美色中文字幕 | 久久久青青 | 嫩草视频在线观看视频播放 | 久久久国产亚洲精品 | 欧美日韩一区二区三区免费不卡 | 高清性色生活片免费视频软件 | 四库影库免费永久在线 | 一级黄色斤 | 国产精品久久久久影院色老大 | 男女又黄又刺激黄a大片桃色 | aaaaa毛片 | 久久4| 日产一一到六区麻豆 | 色综合精品久久久久久久 | 成人看的午夜免费毛片 | 中国女人野外做爰视频在线看 | 手机国产精品一区二区 | 国产nv精品你懂得 | 九九精品成人免费国产片 | 99热精品国产三级在线观看 | 欧美日韩在线国产 | 精品国产电影网久久久久婷婷 | 久久免费视频7 | 黄色小网站在线观看 | 日本福利片| 日产一区二区三区精品视频 | yy毛片|