亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術文章
文章詳情頁

解決python ThreadPoolExecutor 線程池中的異常捕獲問題

瀏覽:2日期:2022-07-30 17:00:24

問題

最近寫了涉及線程池及線程的 python 腳本,運行過程中發現一個有趣的現象,線程池中的工作線程出現問題,引發了異常,但是主線程沒有捕獲異常,還在發現 BUG 之前一度以為線程池代碼正常返回。

先說重點

這里主要想介紹 python concurrent.futuresthread.ThreadPoolExecutor 線程池中的 worker 引發異常的時候,并不會直接向上拋起異常,而是需要主線程通過調用concurrent.futures.Future.exception(timeout=None) 方法主動獲取 worker 的異常。

問題重現及解決

引子

問題主要由這樣一段代碼引起的:

def thread_executor(): logger.info('I am slave. I am working. I am going to sleep 3s') sleep(3) logger.info('Exit thread executor')def main(): thread_obj = threading.Thread(target=thread_executor) while True: logger.info('Master starts thread worker') try: # 工作線程由于某種異常而結束并退出了,想重啟工作線程的工作,但又不想重復創建線程 thread_obj.start() # 這一行會報錯,同一線程不能重復啟動 except Exception as e: logger.error('Master start thread error', exc_info=True) raise e logger.info('Master is going to sleep 5s') sleep(5)

上面這段代碼的功能如注釋中解釋的,主要要實現類似生產者消費者的功能,工作線程一直去生產資源,主線程去消費工作線程生產的資源。但是工作線程由于異常推出了,想重新啟動生產工作。顯然,這個代碼會報錯。

運行結果:

thread: MainThread [INFO] Master starts thread workerthread: Thread-1 [INFO] I am slave. I am working. I am going to sleep 3sthread: MainThread [INFO] Master is going to sleep 5sthread: Thread-1 [INFO] Exit thread executor because of some exceptionthread: MainThread [INFO] Master starts thread workerthread: MainThread [ERROR] Master start thread errorTraceback (most recent call last):File 'xxx.py', line 47, in main thread_obj.start()File 'E:anacondalibthreading.py', line 843, in start raise RuntimeError('threads can only be started once')RuntimeError: threads can only be started onceTraceback (most recent call last):File 'xxx.py', line 56, in <module> main()File 'xxx.py', line 50, in main raise eFile 'xxx.py', line 47, in main thread_obj.start()File 'E:anacondalibthreading.py', line 843, in start raise RuntimeError('threads can only be started once')RuntimeError: threads can only be started once

切入正題

然而腳本還有其他業務代碼要運行,所以需要把上面的資源生產和消費的代碼放到一個線程里完成,所以引入線程池來執行這段代碼:

def thread_executor(): while True: logger.info('I am slave. I am working. I am going to sleep 3s') sleep(3) logger.info('Exit thread executor because of some exception') breakdef main(): thread_obj = threading.Thread(target=thread_executor) while True: logger.info('Master starts thread worker') # 工作線程由于某種異常而結束并退出了,想重啟工作線程的工作,但又不想重復創建線程 # 沒有想到這里會有異常 thread_obj.start() # 這一行會報錯,同一線程不能重復啟動 logger.info('Master is going to sleep 5s') sleep(5)def thread_pool_main(): thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix='WorkExecutor') logger.info('Master ThreadPool Executor starts thread worker') thread_obj.submit(main) while True: logger.info('Master ThreadPool Executor is going to sleep 5s') sleep(5)if __name__ == ’__main__’: thread_pool_main()

代碼運行結果如下:

INFO [thread: MainThread] Master ThreadPool Executor starts thread workerINFO [thread: WorkExecutor_0] Master starts thread workerINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3sINFO [thread: WorkExecutor_0] Master is going to sleep 5sINFO [thread: Thread-1] Exit thread executor because of some exceptionINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: WorkExecutor_0] Master starts thread workerINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s... ...

顯然,由上面的結果,在線程池 worker 執行到 INFO [thread: WorkExecutor_0] Master starts thread worker 的時候,是會有異常產生的,但是整個代碼并沒有拋棄任何異常。

解決方法

發現上面的 bug 后,想在線程池 worker 出錯的時候,把異常記錄到日志。查閱資料,要獲取線程池的異常信息,需要調用 concurrent.futures.Future.exception(timeout=None) 方法,為了記錄日志,這里加了線程池執行結束的回調函數。同時,日志中記錄異常信息,用了 logging.exception() 方法。

def thread_executor(): while True: logger.info('I am slave. I am working. I am going to sleep 3s') sleep(3) logger.info('Exit thread executor because of some exception') breakdef main(): thread_obj = threading.Thread(target=thread_executor) while True: logger.info('Master starts thread worker') # 工作線程由于某種異常而結束并退出了,想重啟工作線程的工作,但又不想重復創建線程 # 沒有想到這里會有異常 thread_obj.start() # 這一行會報錯,同一線程不能重復啟動 logger.info('Master is going to sleep 5s') sleep(5)def thread_pool_callback(worker): logger.info('called thread pool executor callback function') worker_exception = worker.exception() if worker_exception: logger.exception('Worker return exception: {}'.format(worker_exception))def thread_pool_main(): thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix='WorkExecutor') logger.info('Master ThreadPool Executor starts thread worker') thread_pool_exc = thread_obj.submit(main) thread_pool_exc.add_done_callback(thread_pool_callback) # logger.info('thread pool exception: {}'.format(thread_pool_exc.exception())) while True: logger.info('Master ThreadPool Executor is going to sleep 5s') sleep(5)if __name__ == ’__main__’: thread_pool_main()

代碼運行結果:

INFO [thread: MainThread] Master ThreadPool Executor starts thread workerINFO [thread: WorkExecutor_0] Master starts thread workerINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3sINFO [thread: WorkExecutor_0] Master is going to sleep 5sINFO [thread: Thread-1] Exit thread executor because of some exceptionINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: WorkExecutor_0] Master starts thread workerINFO [thread: WorkExecutor_0] called thread pool executor callback functionERROR [thread: WorkExecutor_0] Worker return exception: threads can only be started onceTraceback (most recent call last):File 'E:anacondalibconcurrentfuturesthread.py', line 57, in run result = self.fn(*self.args, **self.kwargs)File 'xxxx.py', line 46, in main thread_obj.start() # 這一行會報錯,同一線程不能重復啟動File 'E:anacondalibthreading.py', line 843, in start raise RuntimeError('threads can only be started once')RuntimeError: threads can only be started onceINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5sINFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s... ...

最終的寫法

其實,上面寫法中,想重復利用一個線程去實現生產者線程的實現方法是有問題的,在此處,一般情況下,線程執行結束后,線程資源會被會被操作系統,所以線程不能被重復調用 start() 。

解決python ThreadPoolExecutor 線程池中的異常捕獲問題

解決python ThreadPoolExecutor 線程池中的異常捕獲問題

一種可行的實現方式就是,用線程池替代。當然,這樣做得注意上面提到的線程池執行體的異常捕獲問題。

def thread_executor(): while True: logger.info('I am slave. I am working. I am going to sleep 3s') sleep(3) logger.info('Exit thread executor because of some exception') breakdef executor_callback(worker): logger.info('called worker callback function') worker_exception = worker.exception() if worker_exception: logger.exception('Worker return exception: {}'.format(worker_exception)) # raise worker_exceptiondef main(): slave_thread_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix='SlaveExecutor') restart_flag = False while True: logger.info('Master starts thread worker') if not restart_flag: restart_flag = not restart_flag logger.info('Restart Slave work') slave_thread_pool.submit(thread_executor).add_done_callback(executor_callback) logger.info('Master is going to sleep 5s') sleep(5)

總結

這個問題主要還是因為對 Python 的 concurrent.futuresthread.ThreadPoolExecutor 不夠了解導致的,接觸這個包是在書本上,但是書本沒完全介紹包的全部 API 及用法,所以代碼產生異常情況后,DEBUG 了許久在真正找到問題所在。查閱 python docs 后才對其完整用法有所認識,所以,以后學習新的 python 包的時候還是可以查一查官方文檔的。

參考資料

英文版: docs of python concurrent.futures

中文版: python docs concurrent.futures — 啟動并行任務

exception(timeout=None)

返回由調用引發的異常。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。

如果 futrue 在完成前被取消則 CancelledError 將被觸發。

如果調用正常完成那么返回 None。

add_done_callback(fn)

附加可調用 fn 到期程。當期程被取消或完成運行時,將會調用 fn,而這個期程將作為它唯一的參數。

加入的可調用對象總被屬于添加它們的進程中的線程按加入的順序調用。如果可調用對象引發一個 Exception 子類,它會被記錄下來并被忽略掉。如果可調用對象引發一個 BaseException 子類,這個行為沒有定義。

如果期程已經完成或已取消,fn 會被立即調用。

以上這篇解決python ThreadPoolExecutor 線程池中的異常捕獲問題就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持好吧啦網。

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 日韩在线一区视频 | 色男人的天堂久久综合 | 国产一级簧片 | 男女交性高清全过程无遮挡 | 在线日本妇人成熟免费观看 | 日韩在线视频播放 | 一级视频免费观看 | 性刺激欧美三级在线观看 | 国产牛仔裤系列在线观看 | 国产亚洲欧美在线人成aaaa | 黄色小视频免费观看 | 一区二区三区精品国产 | 中文字幕卡二和卡三的视频 | 欧美精品一区二区三区观 | 亚洲精品国产一区二区三区在 | 青草久操 | 国产精品超清大白屁股 | bt7086 福利二区 最新合集 | a级网站在线观看 | 国产精品日韩欧美久久综合 | 亚洲天天综合网 | 国产又黄又爽又色视频观看免费 | 久久久www成人免费精品 | 亚洲精品主播一区二区三区 | 91高清在线成人免费观看 | 精品理论片一区二区三区 | 欧美限制级在线观看 | 精品久久免费视频 | 一级毛片在播放免费 | 亚洲色图套图超市 | 国产成人精品一区二区不卡 | 成人一级片在线观看 | 一级美女黄色片 | 日本二级黄色 | 婷婷播放器 | 最新99国产成人精品视频免费 | 欧洲男女下面进出的视频 | 国产成人永久免费视 | 精品国产中文一级毛片在线看 | 香蕉视频在线网址 | 免费鲁丝片一级观看 |