Python3標準庫之threading進程中管理并發操作方法
threading模塊提供了管理多個線程執行的API,允許程序在同一個進程空間并發的運行多個操作。
1.1 Thread對象要使用Thread,最簡單的方法就是用一個目標函數實例化一個Thread對象,并調用start()讓它開始工作。
import threadingdef worker(): '''thread worker function''' print(’Worker’)threads = []for i in range(5): t = threading.Thread(target=worker) threads.append(t) t.start()
輸出有5行,每一行都是'Worker'。
如果能夠創建一個線程,并向它傳遞參數告訴它要完成什么工作,那么這會很有用。任何類型的對象都可以作為參數傳遞到線程。下面的例子傳遞了一個數,線程將打印出這個數。
import threadingdef worker(num): '''thread worker function''' print(’Worker: %s’ % num)threads = []for i in range(5): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start()
現在這個整數參數會包含在各線程打印的消息中。
使用參數來標識或命名線程很麻煩,也沒有必要。每個Thread實例都有一個帶有默認值的名,該默認值可以在創建線程時改變。如果服務器進程中有多個服務線程處理不同的操作,那么在這樣的服務器進程中,對線程命名就很有用。
import threadingimport timedef worker(): print(threading.current_thread().getName(), ’Starting’) time.sleep(0.2) print(threading.current_thread().getName(), ’Exiting’)def my_service(): print(threading.current_thread().getName(), ’Starting’) time.sleep(0.3) print(threading.current_thread().getName(), ’Exiting’)t = threading.Thread(name=’my_service’, target=my_service)w = threading.Thread(name=’worker’, target=worker)w2 = threading.Thread(target=worker) # use default namew.start()w2.start()t.start()
調試輸出的每一行中包含有當前線程的名。線程名列中有'Thread-1'的行對應未命名的線程w2。
大多數程序并不使用print來進行調試。logging模塊支持將線程名嵌入到各個日志消息中(使用格式化代碼%(threadName)s)。通過把線程名包含在日志消息中,就能跟蹤這些消息的來源。
import loggingimport threadingimport timedef worker(): logging.debug(’Starting’) time.sleep(0.2) logging.debug(’Exiting’)def my_service(): logging.debug(’Starting’) time.sleep(0.3) logging.debug(’Exiting’)logging.basicConfig( level=logging.DEBUG, format=’[%(levelname)s] (%(threadName)-10s) %(message)s’,)t = threading.Thread(name=’my_service’, target=my_service)w = threading.Thread(name=’worker’, target=worker)w2 = threading.Thread(target=worker) # use default namew.start()w2.start()t.start()
而且logging是線程安全的,所以來自不同線程的消息在輸出中會有所區分。
到目前為止,示例程序都在隱式地等待所有線程完成工作之后才退出。不過,程序有時會創建一個線程作為守護線程(daemon),這個線程可以一直運行而不阻塞主程序退出。
如果一個服務不能很容易地中斷線程,或者即使讓線程工作到一半時中止也不會造成數據損失或破壞(例如,為一個服務監控工具生成“心跳”的線程),那么對于這些服務,使用守護線程就很有用。要標志一個線程為守護線程,構造線程時便要傳入daemon=True或者要調用它的setDaemon()方法并提供參數True。默認情況下線程不作為守護線程。
import threadingimport timeimport loggingdef daemon(): logging.debug(’Starting’) time.sleep(0.2) logging.debug(’Exiting’)def non_daemon(): logging.debug(’Starting’) logging.debug(’Exiting’)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)d = threading.Thread(name=’daemon’, target=daemon, daemon=True)t = threading.Thread(name=’non-daemon’, target=non_daemon)d.start()t.start()
這個代碼的輸出中不包含守護線程的“Exiting“消息,因為在從sleep()調用喚醒守護線程之前,所有非守護線程(包括主線程)已經退出。
要等待一個守護線程完成工作,需要使用join()方法。
import threadingimport timeimport loggingdef daemon(): logging.debug(’Starting’) time.sleep(0.2) logging.debug(’Exiting’)def non_daemon(): logging.debug(’Starting’) logging.debug(’Exiting’)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)d = threading.Thread(name=’daemon’, target=daemon, daemon=True)t = threading.Thread(name=’non-daemon’, target=non_daemon)d.start()t.start()d.join()t.join()
使用join()等待守護線程退出意味著它有機會生成它的'Exiting'消息。
默認地,join()會無限阻塞。或者,還可以傳入一個浮點值,表示等待線程在多長時間(秒數)后變為不活動。即使線程在這個時間段內未完成,join()也會返回。
import threadingimport timeimport loggingdef daemon(): logging.debug(’Starting’) time.sleep(0.2) logging.debug(’Exiting’)def non_daemon(): logging.debug(’Starting’) logging.debug(’Exiting’)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)d = threading.Thread(name=’daemon’, target=daemon, daemon=True)t = threading.Thread(name=’non-daemon’, target=non_daemon)d.start()t.start()d.join(0.1)print(’d.isAlive()’, d.isAlive())t.join()
由于傳人的超時時間小于守護線程睡眠的時間,所以join()返回之后這個線程仍是'活著'。
沒有必要為所有守護線程維護一個顯示句柄來確保它們在退出主進程之前已經完成。
enumerate()會返回活動 Thread實例的一個列表。這個列表也包括當前線程,由于等待當前線程終止(join)會引入一種死鎖情況,所以必須跳過。
import randomimport threadingimport timeimport loggingdef worker(): '''thread worker function''' pause = random.randint(1, 5) / 10 logging.debug(’sleeping %0.2f’, pause) time.sleep(pause) logging.debug(’ending’)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)for i in range(3): t = threading.Thread(target=worker, daemon=True) t.start()main_thread = threading.main_thread()for t in threading.enumerate(): if t is main_thread: continue logging.debug(’joining %s’, t.getName()) t.join()
由于工作線程睡眠的時間量是隨機的,所以這個程序的輸出可能有變化。
開始時,Thread要完成一些基本初始化,然后調用其run()方法,這會調用傳遞到構造函數的目標函數。要創建Thread的一個子類,需要覆蓋run()來完成所需的工作。
import threadingimport loggingclass MyThread(threading.Thread): def run(self): logging.debug(’running’)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)for i in range(5): t = MyThread() t.start()
run()的返回值將被忽略。
由于傳遞到Thread構造函數的args和kwargs值保存在私有變量中(這些變量名都有前綴),所以不能很容易地從子類訪問這些值。要向一個定制的線程類型傳遞參數,需要重新定義構造函數,將這些值保存在子類可見的一個實例屬性中。
import threadingimport loggingclass MyThreadWithArgs(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): super().__init__(group=group, target=target, name=name, daemon=daemon) self.args = args self.kwargs = kwargs def run(self): logging.debug(’running with %s and %s’, self.args, self.kwargs)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)for i in range(5): t = MyThreadWithArgs(args=(i,), kwargs={’a’: ’A’, ’b’: ’B’}) t.start()
MyThreadwithArgs使用的API與Thread相同,不過類似于其他定制類,這個類可以輕松地修改構造函數方法,以取得更多參數或者與線程用途更直接相關的不同參數。
有時出于某種原因需要派生Thread,Timer就是這樣一個例子,Timer也包含在threading中。Timer在一個延遲之后開始工作,而且可以在這個延遲期間內的任意時刻被取消。
import threadingimport timeimport loggingdef delayed(): logging.debug(’worker running’)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)t1 = threading.Timer(0.3, delayed)t1.setName(’t1’)t2 = threading.Timer(0.3, delayed)t2.setName(’t2’)logging.debug(’starting timers’)t1.start()t2.start()logging.debug(’waiting before canceling %s’, t2.getName())time.sleep(0.2)logging.debug(’canceling %s’, t2.getName())t2.cancel()logging.debug(’done’)
這個例子中,第二個定時器永遠不會運行,看起來第一個定時器在主程序的其余部分完成之后還會運行。由于這不是一個守護線程,所以在主線程完成時其會隱式退出。
盡管使用多線程的目的是并發地運行單獨的操作,但有時也需要在兩個或多個線程中同步操作。事件對象是實現線程間安全通信的一種簡單方法。Event管理一個內部標志,調用者可以用set()和clear()方法控制這個標志。其他線程可以使用wait()暫停,直到這個標志被設置,可有效地阻塞進程直至允許這些線程繼續。
import loggingimport threadingimport timedef wait_for_event(e): '''Wait for the event to be set before doing anything''' logging.debug(’wait_for_event starting’) event_is_set = e.wait() logging.debug(’event set: %s’, event_is_set)def wait_for_event_timeout(e, t): '''Wait t seconds and then timeout''' while not e.is_set(): logging.debug(’wait_for_event_timeout starting’) event_is_set = e.wait(t) logging.debug(’event set: %s’, event_is_set) if event_is_set: logging.debug(’processing event’) else: logging.debug(’doing other work’)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)e = threading.Event()t1 = threading.Thread( name=’block’, target=wait_for_event, args=(e,),)t1.start()t2 = threading.Thread( name=’nonblock’, target=wait_for_event_timeout, args=(e, 2),)t2.start()logging.debug(’Waiting before calling Event.set()’)time.sleep(0.3)e.set()logging.debug(’Event is set’)
wait()方法取一個參數,表示等待事件的時間(秒數),達到這個時間后就超時。它會返回一個布爾值,指示事件是否已設置,使調用者知道wait()為什么返回。可以對事件單獨地使用is_set()方法而不必擔心阻塞。
在這個例子中,wait_for_event_timeout()將檢查事件狀態而不會無限阻塞。wait_for_event()在wait()調用的位置阻塞,事件狀態改變之前它不會返回。
除了同步線程操作,還有一點很重要,要能夠控制對共享資源的訪問,從而避免破壞或丟失數據。Python的內置數據結構(列表、字典等)是線程安全的,這是Python使用原子字節碼來管理這些數據結構的一個副作用(更新過程中不會釋放保護Python內部數據結構的全局解釋器鎖GIL(Global Interpreter Lock))。Python中實現的其他數據結構或更簡單的類型(如整數和浮點數)則沒有這個保護。要保證同時安全地訪問一個對象,可以使用一個Lock對象。
import loggingimport randomimport threadingimport timeclass Counter: def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self): logging.debug(’Waiting for lock’) self.lock.acquire() try: logging.debug(’Acquired lock’) self.value = self.value + 1 finally: self.lock.release()def worker(c): for i in range(2): pause = random.random() logging.debug(’Sleeping %0.02f’, pause) time.sleep(pause) c.increment() logging.debug(’Done’)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)counter = Counter()for i in range(2): t = threading.Thread(target=worker, args=(counter,)) t.start()logging.debug(’Waiting for worker threads’)main_thread = threading.main_thread()for t in threading.enumerate(): if t is not main_thread: t.join()logging.debug(’Counter: %d’, counter.value)
在這個例子中,worker()函數使一個Counter實例遞增,這個實例管理著一個Lock,以避免兩個線程同時改變其內部狀態。如果沒有使用Lock,就有可能丟失一次對value屬性的修改。
要確定是否有另一個線程請求這個鎖而不影響當前線程,可以向acquire()的blocking參數傳入False。在下一個例子中,worker()想要分別得到3次鎖,并統計為得到鎖而嘗試的次數。與此同時,lock_holder()在占有和釋放鎖之間循環,每個狀態會短暫暫停,以模擬負載情況。
import loggingimport threadingimport timedef lock_holder(lock): logging.debug(’Starting’) while True: lock.acquire() try: logging.debug(’Holding’) time.sleep(0.5) finally: logging.debug(’Not holding’) lock.release() time.sleep(0.5)def worker(lock): logging.debug(’Starting’) num_tries = 0 num_acquires = 0 while num_acquires < 3: time.sleep(0.5) logging.debug(’Trying to acquire’) have_it = lock.acquire(0) try: num_tries += 1 if have_it: logging.debug(’Iteration %d: Acquired’, num_tries) num_acquires += 1 else: logging.debug(’Iteration %d: Not acquired’, num_tries) finally: if have_it: lock.release() logging.debug(’Done after %d iterations’, num_tries)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)lock = threading.Lock()holder = threading.Thread( target=lock_holder, args=(lock,), name=’LockHolder’, daemon=True,)holder.start()worker = threading.Thread( target=worker, args=(lock,), name=’Worker’,)worker.start()
worker()需要超過3次迭代才能得到3次鎖。
正常的Lock對象不能請求多次,即使是由同一個線程請求也不例外。如果同一個調用鏈中的多個函數訪問一個鎖,則可能會產生我們不希望的副作用。
import threadinglock = threading.Lock()print(’First try :’, lock.acquire())print(’Second try:’, lock.acquire(0))
在這里,對第二個acquire()調用給定超時值為0,以避免阻塞,因為鎖已經被第一個調用獲得。
如果同一個線程的不同代碼需要'重新獲得'鎖,那么在這種情況下要使用RLock。
import threadinglock = threading.RLock()print(’First try :’, lock.acquire())print(’Second try:’, lock.acquire(0))
與前面的例子相比,對代碼唯一的修改就是用RLock替換Lock。
鎖實現了上下文管理器API,并與with語句兼容。使用with則不再需要顯式地獲得和釋放鎖。
import threadingimport loggingdef worker_with(lock): with lock: logging.debug(’Lock acquired via with’)def worker_no_with(lock): lock.acquire() try: logging.debug(’Lock acquired directly’) finally: lock.release()logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)lock = threading.Lock()w = threading.Thread(target=worker_with, args=(lock,))nw = threading.Thread(target=worker_no_with, args=(lock,))w.start()nw.start()
函數worker_with()和worker_no_with()用等價的方式管理鎖。
除了使用Event,還可以通過使用一個Condition對象來同步線程。由于Condition使用了一個Lock,所以它可以綁定到一個共享資源,允許多個線程等待資源更新。在下一個例子中,consumer()線程要等待設置了Condition才能繼續。producer()線程負責設置條件,以及通知其他線程繼續。
import loggingimport threadingimport timedef consumer(cond): '''wait for the condition and use the resource''' logging.debug(’Starting consumer thread’) with cond: cond.wait() logging.debug(’Resource is available to consumer’)def producer(cond): '''set up the resource to be used by the consumer''' logging.debug(’Starting producer thread’) with cond: logging.debug(’Making resource available’) cond.notifyAll()logging.basicConfig( level=logging.DEBUG, format=’%(asctime)s (%(threadName)-2s) %(message)s’,)condition = threading.Condition()c1 = threading.Thread(name=’c1’, target=consumer, args=(condition,))c2 = threading.Thread(name=’c2’, target=consumer, args=(condition,))p = threading.Thread(name=’p’, target=producer, args=(condition,))c1.start()time.sleep(0.2)c2.start()time.sleep(0.2)p.start()
這些線程使用with來獲得與Condition關聯的鎖。也可以顯式地使用acquire()和release()方法。
屏障(barrier)是另一種線程同步機制。Barrier會建立一個控制點,所有參與線程會在這里阻塞,直到所有這些參與“方”都到達這一點。采用這種方法,線程可以單獨啟動然后暫停,直到所有線程都準備好才可以繼續。
import threadingimport timedef worker(barrier): print(threading.current_thread().name, ’waiting for barrier with {} others’.format( barrier.n_waiting)) worker_id = barrier.wait() print(threading.current_thread().name, ’after barrier’, worker_id)NUM_THREADS = 3barrier = threading.Barrier(NUM_THREADS)threads = [ threading.Thread( name=’worker-%s’ % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS)]for t in threads: print(t.name, ’starting’) t.start() time.sleep(0.1)for t in threads: t.join()
在這個例子中,Barrier被配置為會阻塞線程,直到3個線程都在等待。滿足這個條件時,所有線程被同時釋放從而越過這個控制點。wait()的返回值指示了釋放的參與線程數,可以用來限制一些線程做清理資源等動作。
Barrier的abort()方法會使所有等待線程接收一個BrokenBarrierError。如果線程在wait()上被阻塞而停止處理,這就允許線程完成清理工作。
import threadingimport timedef worker(barrier): print(threading.current_thread().name, ’waiting for barrier with {} others’.format( barrier.n_waiting)) try: worker_id = barrier.wait() except threading.BrokenBarrierError: print(threading.current_thread().name, ’aborting’) else: print(threading.current_thread().name, ’after barrier’, worker_id)NUM_THREADS = 3barrier = threading.Barrier(NUM_THREADS + 1)threads = [ threading.Thread( name=’worker-%s’ % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS)]for t in threads: print(t.name, ’starting’) t.start() time.sleep(0.1)barrier.abort()for t in threads: t.join()
這個例子將Barrier配置為多加一個線程,即需要比實際啟動的線程再多一個參與線程,所以所有線程中的處理都會阻塞。在被阻塞的各個線程中,abort()調用會產生一個異常。
有時可能需要允許多個工作線程同時訪問一個資源,但要限制總數。例如,連接池支持同時連接,但數目可能是固定的,或者一個網絡應用可能支持固定數目的并發下載。這些連接就可以使用Semaphore來管理。
import loggingimport threadingimport timeclass ActivePool: def __init__(self): super(ActivePool, self).__init__() self.active = [] self.lock = threading.Lock() def makeActive(self, name): with self.lock: self.active.append(name) logging.debug(’Running: %s’, self.active) def makeInactive(self, name): with self.lock: self.active.remove(name) logging.debug(’Running: %s’, self.active)def worker(s, pool): logging.debug(’Waiting to join the pool’) with s: name = threading.current_thread().getName() pool.makeActive(name) time.sleep(0.1) pool.makeInactive(name)logging.basicConfig( level=logging.DEBUG, format=’%(asctime)s (%(threadName)-2s) %(message)s’,)pool = ActivePool()s = threading.Semaphore(2)for i in range(4): t = threading.Thread( target=worker, name=str(i), args=(s, pool), ) t.start()
在這個例子中,ActivePool類只作為一種便利方法,用來跟蹤某個給定時刻哪些線程能夠運行。真正的資源池會為新的活動線程分配一個連接或另外某個值,并且當這個線程工作完成時再回收這個值。在這里,資源池只是用來保存活動線程的名,以顯示至少有兩個線程在并發運行。
有些資源需要鎖定以便多個線程使用,另外一些資源則需要保護,以使它們對并非是這些資源的“所有者”的線程隱藏。local()函數會創建一個對象,它能夠隱藏值,使其在不同線程中無法被看到。
import randomimport threadingimport loggingdef show_value(data): try: val = data.value except AttributeError: logging.debug(’No value yet’) else: logging.debug(’value=%s’, val)def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data)logging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)local_data = threading.local()show_value(local_data)local_data.value = 1000show_value(local_data)for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
屬性local_data.value對所有線程都不可見,除非在某個線程中設置了這個屬性,這個線程才能看到它。
要初始化設置以使所有線程在開始時都有相同的值,可以使用一個子類,并在_init_()中設置這些屬性。
import randomimport threadingimport loggingdef show_value(data): try: val = data.value except AttributeError: logging.debug(’No value yet’) else: logging.debug(’value=%s’, val)def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data)class MyLocal(threading.local): def __init__(self, value): super().__init__() logging.debug(’Initializing %r’, self) self.value = valuelogging.basicConfig( level=logging.DEBUG, format=’(%(threadName)-10s) %(message)s’,)local_data = MyLocal(1000)show_value(local_data)for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
這會在相同的對象上調用_init_()(注意id()值),每個線程中調用一次以設置默認值。
總結
到此這篇關于Python3標準庫:threading進程中管理并發操作的文章就介紹到這了,更多相關Python3標準庫:threading進程中管理并發操作內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章:
