Python gevent協程切換實現詳解
一、背景
大家都知道gevent的機制是單線程+協程機制,當遇到可能會阻塞的操作時,就切換到可運行的協程中繼續運行,以此來實現提交系統運行效率的目標,但是具體是怎么實現的呢?讓我們直接從代碼中看一下吧。
二、切換機制
讓我們從socket的send、recv方法入手:
def recv(self, *args): while 1: try: return self._sock.recv(*args) except error as ex: if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:raise # QQQ without clearing exc_info test__refcount.test_clean_exit fails sys.exc_clear() self._wait(self._read_event)
這里會開啟一個死循環,在循環中調用self._sock.recv()方法,并捕獲異常,當錯誤是EWOULDBLOCK時,則調用self._wait(self._read_event)方法,該方法其實是:_wait = _wait_on_socket,_wait_on_socket方法的定義在文件:_hub_primitives.py中,如下:
# Suitable to be bound as an instance methoddef wait_on_socket(socket, watcher, timeout_exc=None): if socket is None or watcher is None: # test__hub TestCloseSocketWhilePolling, on Python 2; Python 3 # catches the EBADF differently. raise ConcurrentObjectUseError('The socket has already been closed by another greenlet') _primitive_wait(watcher, socket.timeout, timeout_exc if timeout_exc is not None else _NONE, socket.hub)
該方法其實是調用了函數:_primitive_wait(),其仍然在文件:_hub_primitives.py中定義,如下:
def _primitive_wait(watcher, timeout, timeout_exc, hub): if watcher.callback is not None: raise ConcurrentObjectUseError(’This socket is already used by another greenlet: %r’ % (watcher.callback, )) if hub is None: hub = get_hub() if timeout is None: hub.wait(watcher) return timeout = Timeout._start_new_or_dummy( timeout, (timeout_exc if timeout_exc is not _NONE or timeout is None else _timeout_error(’timed out’))) with timeout: hub.wait(watcher)
這里其實是調用了hub.wait()函數,該函數的定義在文件_hub.py中,如下:
class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable def wait(self, watcher): ''' Wait until the *watcher* (which must not be started) is ready. The current greenlet will be unscheduled during this time. ''' waiter = Waiter(self) # pylint:disable=undefined-variable watcher.start(waiter.switch, waiter) try: result = waiter.get() if result is not waiter:raise InvalidSwitchError( ’Invalid switch into %s: got %r (expected %r; waiting on %r with %r)’ % ( getcurrent(), # pylint:disable=undefined-variable result, waiter, self, watcher )) finally: watcher.stop()
watcher.stop()
該類WaitOperationsGreenlet是Hub的基類,其方法wait中的邏輯是:生成一個Waiter對象,并調用watcher.start(waiter.switch, waiter)方法,watcher是最開始recv方法中使用的self._read_event,watcher是gevent的底層事件框架libev中的概念;同時還有一個waiter對象,它類似與python中的future概念,該對象有一個switch()方法以及get()方法,當沒有得到結果沒有準備好時,調用waiter.get()方法回導致協程被掛起;get()函數的定義如下:
def get(self): '''If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called.''' if self._exception is not _NONE: if self._exception is None: return self.value getcurrent().throw(*self._exception) # pylint:disable=undefined-variable else: if self.greenlet is not None: raise ConcurrentObjectUseError(’This Waiter is already used by %r’ % (self.greenlet, )) self.greenlet = getcurrent() # pylint:disable=undefined-variable try: return self.hub.switch() finally: self.greenlet = None
在get()中最關鍵的是self.hub.switch()函數,該函數將執行權轉移到hub,并繼續運行,至此已經分析完了當在worker協程中從網絡獲取數據遇到阻塞時,如何避免阻塞并切換到hub中的實現,至于何時再切換會worker協程,我們后續再繼續分析。
總結
要記得gevent中一個重要的概念,協程切換不是調用而是執行權的轉移,從可能會阻塞的協程切換到hub,并由hub在合適的時機切換到另一個可以繼續運行的協程繼續執行;gevent通過這種形式實現了提高io密集型應用吞吐率的目標。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。
相關文章: