通過實例解析Python RPC實現原理及方法
單線程同步
使用socket傳輸數據 使用json序列化消息體 struct將消息編碼為二進制字節串,進行網絡傳輸消息協議
// 輸入{ in: 'ping', params: 'ireader 0'}// 輸出{ out: 'pong', result: 'ireader 0'}
客戶端 client.py
# coding: utf-8# client.pyimport jsonimport timeimport structimport socketdef rpc(sock, in_, params): response = json.dumps({'in': in_, 'params': params}) # 請求消息體 length_prefix = struct.pack('I', len(response)) # 請求長度前綴 sock.sendall(length_prefix) sock.sendall(response) length_prefix = sock.recv(4) # 響應長度前綴 length, = struct.unpack('I', length_prefix) body = sock.recv(length) # 響應消息體 response = json.loads(body) return response['out'], response['result'] # 返回響應類型和結果if __name__ == ’__main__’: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('localhost', 8080)) for i in range(10): # 連續發送10個rpc請求 out, result = rpc(s, 'ping', 'ireader %d' % i) print out, result time.sleep(1) # 休眠1s,便于觀察 s.close() # 關閉連接
服務端 blocking_single.py
# coding: utf8# blocking_single.pyimport jsonimport structimport socketdef handle_conn(conn, addr, handlers): print addr, 'comes' while True: # 循環讀寫 length_prefix = conn.recv(4) # 請求長度前綴 if not length_prefix: # 連接關閉了 print addr, 'bye' conn.close() break # 退出循環,處理下一個連接 length, = struct.unpack('I', length_prefix) body = conn.recv(length) # 請求消息體 request = json.loads(body) in_ = request[’in’] params = request[’params’] print in_, params handler = handlers[in_] # 查找請求處理器 handler(conn, params) # 處理請求def loop(sock, handlers): while True: conn, addr = sock.accept() # 接收連接 handle_conn(conn, addr, handlers) # 處理連接def ping(conn, params): send_result(conn, 'pong', params)def send_result(conn, out, result): response = json.dumps({'out': out, 'result': result}) # 響應消息體 length_prefix = struct.pack('I', len(response)) # 響應長度前綴 conn.sendall(length_prefix) conn.sendall(response)if __name__ == ’__main__’: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 創建一個TCP套接字 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 打開reuse addr選項 sock.bind(('localhost', 8080)) # 綁定端口 sock.listen(1) # 監聽客戶端連接 handlers = { # 注冊請求處理器 'ping': ping } loop(sock, handlers) # 進入服務循環
多線程同步
使用線程庫thread創建原生線程 服務器可并行處理多個客戶端服務端 multithread.py
多進程同步
Python的GIL導致單個進程只能占滿一個CPU核心,多線程無法利用多核優勢 os.fork()會生成子進程 子進程退出后,父進程需使用waitpid系統調用收割子進程,防止其稱為僵尸資源 在子進程中關閉服務器套接字后,在父進程中也要關閉服務器套接字 因為進程fork后,父子進程都有自己的套接字引用指向內核的同一份套接字對象,套接字引用計數為2,對套接字進程close,即將套接字對象的引用計數減1PreForking同步
進程比線程耗費資源,通過PreForking進程池模型對服務器開辟的進程數量進行限制,避免服務器負載過重 如果并行的連接數量超過了prefork進程數量,后來的客戶端請求將會阻塞單進程異步
通過事件輪詢API,查詢相關套接字是否有響應的讀寫事件,有則攜帶事件列表返回,沒有則阻塞 拿到讀寫事件后,可對事件相關的套接字進行讀寫操作 設置讀寫緩沖區 Nginx/Nodejs/Redis都是基于異步模型 異步模型編碼成本高,易出錯,通常在公司業務代碼中采用同步模型,僅在講究高并發高性能的場合才使用異步模型PreForking異步
Tornado/Nginx采用了多進程PreForking異步模型,具有良好的高并發處理能力
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。
相關文章: