詳解Java并發(fā)包中線程池ThreadPoolExecutor
線程池的使用主要是解決兩個(gè)問(wèn)題:①當(dāng)執(zhí)行大量異步任務(wù)的時(shí)候線程池能夠提供更好的性能,在不使用線程池時(shí)候,每當(dāng)需要執(zhí)行異步任務(wù)的時(shí)候直接new一個(gè)線程來(lái)運(yùn)行的話,線程的創(chuàng)建和銷毀都是需要開(kāi)銷的。而線程池中的線程是可復(fù)用的,不需要每次執(zhí)行異步任務(wù)的時(shí)候重新創(chuàng)建和銷毀線程;②線程池提供一種資源限制和管理的手段,比如可以限制線程的個(gè)數(shù),動(dòng)態(tài)的新增線程等等。
在下面的分析中,我們可以看到,線程池使用一個(gè)Integer的原子類型變量來(lái)記錄線程池狀態(tài)和線程池中的線程數(shù)量,通過(guò)線程池狀態(tài)來(lái)控制任務(wù)的執(zhí)行,每個(gè)工作線程Worker線程可以處理多個(gè)任務(wù)。
二、ThreadPoolExecutor類2.1、ThreadPoolExecutor成員變量以含義ThreadPoolExecutor繼承了AbstractExecutorService,其中的成員變量ctl是一個(gè)Integer類型的原子變量,用來(lái)記錄線程池的狀態(tài)和線程池中的線程的個(gè)數(shù)。這里(Integer看做32位)ctl高三位表示線程池的狀態(tài),后面的29位表示線程池中的線程個(gè)數(shù)。如下所示是ThreadPoolExecutor源碼中的成員變量
//(高3位)表示線程池狀態(tài),(低29位)表示線程池中線程的個(gè)數(shù);// 默認(rèn)狀態(tài)是RUNNING,線程池中線程個(gè)數(shù)為0private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//表示具體平臺(tái)下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)表示的數(shù)才是線程的個(gè)數(shù);//其中Integer.SIZE=32,-3之后的低29位表示的就是線程的個(gè)數(shù)了private static final int COUNT_BITS = Integer.SIZE - 3;//線程最大個(gè)數(shù)(低29位)00011111111111111111111111111111(1<<29-1)private static final int CAPACITY = (1 << COUNT_BITS) - 1;//線程池狀態(tài)(高3位表示線程池狀態(tài))//111 00000000000000000000000000000private static final int RUNNING = -1 << COUNT_BITS;//000 00000000000000000000000000000private static final int SHUTDOWN = 0 << COUNT_BITS;//001 00000000000000000000000000000private static final int STOP = 1 << COUNT_BITS;//010 00000000000000000000000000000private static final int TIDYING = 2 << COUNT_BITS;//011 00000000000000000000000000000private static final int TERMINATED = 3 << COUNT_BITS;//獲取高3位(運(yùn)行狀態(tài))==> c & 11100000000000000000000000000000private static int runStateOf(int c) { return c & ~CAPACITY; }//獲取低29位(線程個(gè)數(shù))==> c & 00011111111111111111111111111111private static int workerCountOf(int c) { return c & CAPACITY; }//計(jì)算原子變量ctl新值(運(yùn)行狀態(tài)和線程個(gè)數(shù))private static int ctlOf(int rs, int wc) { return rs | wc; }
下面我們簡(jiǎn)單解釋一下上面的線程狀態(tài)的含義:
①RUNNING:接受新任務(wù)并處理阻塞隊(duì)列中的任務(wù)
②SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊(duì)列中的任務(wù)
③STOP:拒絕新任務(wù)并拋棄阻塞隊(duì)列中的任務(wù),同時(shí)會(huì)中斷當(dāng)前正在執(zhí)行的任務(wù)
④TIDYING:所有任務(wù)執(zhí)行完之后(包含阻塞隊(duì)列中的任務(wù))當(dāng)前線程池中活躍的線程數(shù)量為0,將要調(diào)用terminated方法
⑥TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用之后的狀態(tài)
2.2、ThreadPoolExecutor的參數(shù)以及實(shí)現(xiàn)原理①corePoolSize:線程池核心現(xiàn)車個(gè)數(shù)
②workQueue:用于保存等待任務(wù)執(zhí)行的任務(wù)的阻塞隊(duì)列(比如基于數(shù)組的有界阻塞隊(duì)列ArrayBlockingQueue、基于鏈表的無(wú)界阻塞隊(duì)列LinkedBlockingQueue等等)
③maximumPoolSize:線程池最大線程數(shù)量
④ThreadFactory:創(chuàng)建線程的工廠
⑤RejectedExecutionHandler:拒絕策略,表示當(dāng)隊(duì)列已滿并且線程數(shù)量達(dá)到線程池最大線程數(shù)量的時(shí)候?qū)π绿峤坏娜蝿?wù)所采取的策略,主要有四種策略:AbortPolicy(拋出異常)、CallerRunsPolicy(只用調(diào)用者所在線程來(lái)運(yùn)行該任務(wù))、DiscardOldestPolicy(丟掉阻塞隊(duì)列中最近的一個(gè)任務(wù)來(lái)處理當(dāng)前提交的任務(wù))、DiscardPolicy(不做處理,直接丟棄掉)
⑥keepAliveTime:存活時(shí)間,如果當(dāng)前線程池中的數(shù)量比核心線程數(shù)量多,并且當(dāng)前線程是閑置狀態(tài),該變量就是這些線程的最大生存時(shí)間
⑦TimeUnit:存活時(shí)間的時(shí)間單位。
根據(jù)上面的參數(shù)介紹,簡(jiǎn)單了解一下線程池的實(shí)現(xiàn)原理,以提交一個(gè)新任務(wù)為開(kāi)始點(diǎn),分析線程池的主要處理流程
①newFixedThreadPool:創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)均為nThreads的線程池,并且阻塞隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,keepAliveTime=0說(shuō)明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前空閑即回收。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}
②newSingleThreadExecutor:創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)都為1 的線程池,并且阻塞隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,keepAliveTime=0說(shuō)明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前線程空閑即回收該線程。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
③newCachedThreadPoolExecutor:創(chuàng)建一個(gè)按需創(chuàng)建線程的線程池,初始線程個(gè)數(shù)為0,最多線程個(gè)數(shù)為Integer.MAX_VALUE,并且阻塞隊(duì)列為同步隊(duì)列(最多只有一個(gè)元素),keepAliveTime=60說(shuō)明只要當(dāng)前線程在60s內(nèi)空閑則回收。這個(gè)類型的線程池的特點(diǎn)就是:加入同步隊(duì)列的任務(wù)會(huì)被馬上執(zhí)行,同步隊(duì)列中最多只有一個(gè)任務(wù)
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}2.4、ThreadPoolExecutor中的其他成員
//獨(dú)占鎖,用來(lái)控制新增工作線程Worker操作的原子性private final ReentrantLock mainLock = new ReentrantLock();//工作線程集合,Worker繼承了AQS接口和Runnable接口,是具體處理任務(wù)的線程對(duì)象//Worker實(shí)現(xiàn)AQS,并自己實(shí)現(xiàn)了簡(jiǎn)單不可重入獨(dú)占鎖,其中state=0表示當(dāng)前鎖未被獲取狀態(tài),state=1表示鎖被獲取,//state=-1表示W(wǎng)ork創(chuàng)建時(shí)候的默認(rèn)狀態(tài),創(chuàng)建時(shí)候設(shè)置state=-1是為了防止runWorker方法運(yùn)行前被中斷private final HashSet<Worker> workers = new HashSet<Worker>();//termination是該鎖對(duì)應(yīng)的條件隊(duì)列,在線程調(diào)用awaitTermination時(shí)候用來(lái)存放阻塞的線程private final Condition termination = mainLock.newCondition();三、execute(Runnable command)方法實(shí)現(xiàn)
executor方法的作用是提交任務(wù)command到線程池執(zhí)行,可以簡(jiǎn)單的按照下面的圖進(jìn)行理解,ThreadPoolExecutor的實(shí)現(xiàn)類似于一個(gè)生產(chǎn)者消費(fèi)者模型,當(dāng)用戶添加任務(wù)到線程池中相當(dāng)于生產(chǎn)者生產(chǎn)元素,workers工作線程則直接執(zhí)行任務(wù)或者從任務(wù)隊(duì)列中獲取任務(wù),相當(dāng)于消費(fèi)之消費(fèi)元素。
public void execute(Runnable command) { //(1)首先檢查任務(wù)是否為null,為null拋出異常,否則進(jìn)行下面的步驟 if (command == null)throw new NullPointerException(); //(2)ctl值中包含了當(dāng)前線程池的狀態(tài)和線程池中的線程數(shù)量 int c = ctl.get(); //(3)workerCountOf方法是獲取低29位,即獲取當(dāng)前線程池中的線程個(gè)數(shù),如果小于corePoolSize,就開(kāi)啟新的線程運(yùn)行 if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return;c = ctl.get(); } //(4)如果線程池處理RUNNING狀態(tài),就添加任務(wù)到阻塞隊(duì)列中 if (isRunning(c) && workQueue.offer(command)) {//(4-1)二次檢查,獲取ctl值int recheck = ctl.get();//(4-2)如果當(dāng)前線程池不是出于RUNNING狀態(tài),就從隊(duì)列中刪除任務(wù),并執(zhí)行拒絕策略if (! isRunning(recheck) && remove(command)) reject(command);//(4-3)否則,如果線程池為空,就添加一個(gè)線程else if (workerCountOf(recheck) == 0) addWorker(null, false); } //(5)如果隊(duì)列滿,則新增線程,如果新增線程失敗,就執(zhí)行拒絕策略 else if (!addWorker(command, false))reject(command);}
我們?cè)诳匆幌律厦娲a的執(zhí)行流程,按照標(biāo)記的數(shù)字進(jìn)行分析:
①步驟(3)判斷當(dāng)前線程池中的線程個(gè)數(shù)是否小于corePoolSize,如果小于核心線程數(shù),會(huì)向workers里面新增一個(gè)核心線程執(zhí)行任務(wù)。
②如果當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù),就執(zhí)行(4)。(4)首先判斷當(dāng)前線程池是否處于RUNNING狀態(tài),如果處于該狀態(tài),就添加任務(wù)到任務(wù)隊(duì)列中,這里需要判斷線程池的狀態(tài)是因?yàn)榫€程池可能已經(jīng)處于非RUNNING狀態(tài),而在非RUNNING狀態(tài)下是需要拋棄新任務(wù)的。
③如果想任務(wù)隊(duì)列中添加任務(wù)成功,需要進(jìn)行二次校驗(yàn),因?yàn)樵谔砑尤蝿?wù)到任務(wù)隊(duì)列后,可能線程池的狀態(tài)發(fā)生了變化,所以這里需要進(jìn)行二次校驗(yàn),如果當(dāng)前線程池已經(jīng)不是RUNNING狀態(tài)了,需要將任務(wù)從任務(wù)隊(duì)列中移除,然后執(zhí)行拒絕策略;如果二次校驗(yàn)通過(guò),則執(zhí)行4-3代碼重新判斷當(dāng)前線程池是否為空,如果線程池為空沒(méi)有線程,那么就需要新創(chuàng)建一個(gè)線程。
④如果上面的步驟(4)創(chuàng)建添加任務(wù)失敗,說(shuō)明隊(duì)列已滿,那么(5)會(huì)嘗試再開(kāi)啟新的線程執(zhí)行任務(wù)(類比上圖中的thread3和thread4,即不是核心線程的那些線程),如果當(dāng)前線程池中的線程個(gè)數(shù)已經(jīng)大于最大線程數(shù)maximumPoolSize,表示不能開(kāi)啟新的線程。這就屬于線程池滿并且任務(wù)隊(duì)列滿,就需要執(zhí)行拒絕策略了。
下面我們?cè)诳纯碼ddWorker方法的實(shí)現(xiàn)
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {int c = ctl.get();int rs = runStateOf(c);//(6)檢查隊(duì)列是否只在必要時(shí)候?yàn)榭読f (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;//(7)使用CAS增加線程個(gè)數(shù)for (;;) { //根據(jù)ctl值獲得當(dāng)前線程池中的線程數(shù)量 int wc = workerCountOf(c); //(7-1)如果線程數(shù)量超出限制,返回false if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false; //(7-2)CAS增加線程數(shù)量,同時(shí)只有一個(gè)線程可以成功 if (compareAndIncrementWorkerCount(c))break retry; c = ctl.get(); // 重新讀取ctl值 //(7-3)CAS失敗了,需要查看當(dāng)前線程池狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要跳轉(zhuǎn)到外層循環(huán)嘗試重新獲取線程池狀態(tài),否則內(nèi)層循環(huán)重新進(jìn)行CAS增加線程數(shù)量 if (runStateOf(c) != rs)continue retry;} } //(8)執(zhí)行到這里說(shuō)明CAS增加新線程個(gè)數(shù)成功了,我們需要開(kāi)始創(chuàng)建新的工作線程Worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try {//(8-1)創(chuàng)建新的workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) { final ReentrantLock mainLock = this.mainLock; //(8-2)加獨(dú)占鎖,保證workers的同步,可能線程池中的多個(gè)線程調(diào)用了線程池的execute方法 mainLock.lock(); try {// (8-3)重新檢查線程池狀態(tài),以免在獲取鎖之前調(diào)用shutdown方法改變線程池狀態(tài)int rs = runStateOf(ctl.get());if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException(); //(8-4)添加新任務(wù) workers.add(w); int s = workers.size(); if (s > largestPoolSize)largestPoolSize = s; workerAdded = true;} } finally {mainLock.unlock(); } //(8-6)添加新任務(wù)成功之后,啟動(dòng)任務(wù) if (workerAdded) {t.start();workerStarted = true; }} } finally {if (! workerStarted) addWorkerFailed(w); } return workerStarted;}
簡(jiǎn)單再分析說(shuō)明一下上面的代碼,addWorker方法主要分為兩部分,第一部分是使用CAS線程安全的添加線程數(shù)量,第二部分則是創(chuàng)建新的線程并且將任務(wù)并發(fā)安全的添加到新的workers之中,然后啟動(dòng)線程執(zhí)行。
①代碼(6)中檢查隊(duì)列是否只在必要時(shí)候?yàn)榭眨挥芯€程池狀態(tài)符合條件才能夠進(jìn)行下面的步驟,從(6)中的判斷條件來(lái)看,下面的集中情況addWorker會(huì)直接返回false
( I )當(dāng)前線程池狀態(tài)為STOP,TIDYING或者TERMINATED ; (I I)當(dāng)前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個(gè)任務(wù); (I I I)當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空
②外層循環(huán)中判斷條件通過(guò)之后,在內(nèi)層循環(huán)中使用CAS增加線程數(shù),當(dāng)CAS成功就退出雙重循環(huán)進(jìn)行(8)步驟代碼的執(zhí)行,如果失敗需要查看當(dāng)前線程池的狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要進(jìn)行外層循環(huán)重新判斷線程池狀態(tài)然后在進(jìn)入內(nèi)層循環(huán)重新進(jìn)行CAS增加線程數(shù),如果線程池狀態(tài)沒(méi)有發(fā)生變化但是上一次CAS失敗就繼續(xù)進(jìn)行CAS嘗試。
③執(zhí)行到(8)代碼處,表明當(dāng)前已經(jīng)成功增加 了線程數(shù),但是還沒(méi)有線程執(zhí)行任務(wù)。ThreadPoolExecutor中使用全局獨(dú)占鎖mainLock來(lái)控制將新增的工作線程Worker線程安全的添加到工作者線程集合workers中。
④(8-2)獲取了獨(dú)占鎖,但是在獲取到鎖之后,還需要進(jìn)行重新檢查線程池的狀態(tài),這是為了避免在獲取全局獨(dú)占鎖之前其他線程調(diào)用了shutDown方法關(guān)閉了線程池。如果線程池已經(jīng)關(guān)閉需要釋放鎖。否則將新增的線程添加到工作集合中,釋放鎖啟動(dòng)線程執(zhí)行任務(wù)。
上面的addWorker方法最后幾行中,會(huì)判斷添加工作線程是否成功,如果失敗,會(huì)執(zhí)行addWorkerFailed方法,將任務(wù)從workers中移除,并且workerCount做-1操作。
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; //獲取鎖 mainLock.lock(); try { //如果worker不為null if (w != null) //workers移除worker workers.remove(w); //通過(guò)CAS操作,workerCount-1 decrementWorkerCount(); tryTerminate(); } finally { //釋放鎖 mainLock.unlock(); }}四、工作線程Worker的執(zhí)行4.1、工作線程Worker類源碼分析
上面查看addWorker方法在CAS更新線程數(shù)成功之后,下面就是創(chuàng)建新的Worker線程執(zhí)行任務(wù),所以我們這里先查看Worker類,下面是Worker類的源碼,我們可以看出,Worker類繼承了AQS并實(shí)現(xiàn)了Runnable接口,所以他既是一個(gè)自定義的同步組件,也是一個(gè)執(zhí)行任務(wù)的線程類。下面我們分析Worker類的執(zhí)行
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** 使用線程工廠創(chuàng)建的線程,執(zhí)行任務(wù) */ final Thread thread; /** 初始化執(zhí)行任務(wù) */ Runnable firstTask; /** 計(jì)數(shù) */ volatile long completedTasks; /** * 給出初始firstTask,線程創(chuàng)建工廠創(chuàng)建新的線程 */ Worker(Runnable firstTask) {setState(-1); // 防止在調(diào)用runWorker之前被中斷this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); //使用threadFactory創(chuàng)建線程 } /** run方法實(shí)際上執(zhí)行的是runWorker方法 */ public void run() {runWorker(this); } // 關(guān)于同步狀態(tài)(鎖) // // 同步狀態(tài)state=0表示鎖未被獲取 // 同步狀態(tài)state=1表示鎖被獲取 protected boolean isHeldExclusively() {return getState() != 0; } //下面都是重寫(xiě)AQS的方法,Worker為自定義的同步組件 protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true;}return false; } protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true; } public void lock(){ acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try {t.interrupt(); } catch (SecurityException ignore) { }} }}
在構(gòu)造函數(shù)中我們可以看出,首先將同步狀態(tài)state置為-1,而Worker這個(gè)同步組件的state有三個(gè)值,其中state=-1表示W(wǎng)ork創(chuàng)建時(shí)候的默認(rèn)狀態(tài),創(chuàng)建時(shí)候設(shè)置state=-1是為了防止runWorker方法運(yùn)行前被中斷前面說(shuō)到過(guò)這個(gè)結(jié)論,這里置為-1是為了避免當(dāng)前Worker在調(diào)用runWorker方法之前被中斷(當(dāng)其他線程調(diào)用線程池的shutDownNow時(shí)候,如果Worker的state>=0則會(huì)中斷線程),設(shè)置為-1就不會(huì)被中斷了。而Worker實(shí)現(xiàn)Runnable接口,那么需要重寫(xiě)run方法,在run方法中,我們可以看到,實(shí)際上執(zhí)行的是runWorker方法,在runWorker方法中,會(huì)首先調(diào)用unlock方法,該方法會(huì)將state置為0,所以這個(gè)時(shí)候調(diào)用shutDownNow方法就會(huì)中斷當(dāng)前線程,而這個(gè)時(shí)候已經(jīng)進(jìn)入了runWork方法了,就不會(huì)在還沒(méi)有執(zhí)行runWorker方法的時(shí)候就中斷線程。
4.2、runWorker方法的源碼分析final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 這個(gè)時(shí)候調(diào)用unlock方法,將state置為0,就可以被中斷了 boolean completedAbruptly = true; try {//(10)如果當(dāng)前任務(wù)為null,或者從任務(wù)隊(duì)列中獲取到的任務(wù)為null,就跳轉(zhuǎn)到(11)處執(zhí)行清理工作while (task != null || (task = getTask()) != null) { //task不為null,就需要線程執(zhí)行任務(wù),這個(gè)時(shí)候,需要獲取工作線程內(nèi)部持有的獨(dú)占鎖 w.lock(); /**如果線程池已被停止(STOP)(至少大于STOP狀態(tài)),要確保線程都被中斷 * 如果狀態(tài)不對(duì),檢查當(dāng)前線程是否中斷并清除中斷狀態(tài),并且再次檢查線程池狀態(tài)是否大于STOP * 如果上述滿足,檢查該對(duì)象是否處于中斷狀態(tài),不清除中斷標(biāo)記 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())//中斷該對(duì)象wt.interrupt(); try {//執(zhí)行任務(wù)之前要做的事情beforeExecute(wt, task);Throwable thrown = null;try { task.run(); //執(zhí)行任務(wù)} catch (RuntimeException x) { thrown = x; throw x;} catch (Error x) { thrown = x; throw x;} catch (Throwable x) { thrown = x; throw new Error(x);} finally { //執(zhí)行任務(wù)之后的方法 afterExecute(task, thrown);} } finally {task = null;//更新當(dāng)前已完成任務(wù)數(shù)量w.completedTasks++;//釋放鎖w.unlock(); }}completedAbruptly = false; } finally {//執(zhí)行清理工作:處理并退出當(dāng)前workerprocessWorkerExit(w, completedAbruptly); }}
我們梳理一下runWorker方法的執(zhí)行流程
①首先先執(zhí)行unlock方法,將Worker的state置為0,這樣工作線程就可以被中斷了(后續(xù)的操作如果線程池關(guān)閉就需要線程被中斷)
②首先判斷判斷當(dāng)前的任務(wù)(當(dāng)前工作線程中的task,或者從任務(wù)隊(duì)列中取出的task)是否為null,如果不為null就往下執(zhí)行,為null就執(zhí)行processWorkerExit方法。
③獲取工作線程內(nèi)部持有的獨(dú)占鎖(避免在執(zhí)行任務(wù)期間,其他線程調(diào)用shutdown后正在執(zhí)行的任務(wù)被中斷,shutdown只會(huì)中斷當(dāng)前被阻塞掛起的沒(méi)有執(zhí)行任務(wù)的線程)
④然后執(zhí)行beforeExecute()方法,該方法為擴(kuò)展接口代碼,表示在具體執(zhí)行任務(wù)之前所做的一些事情,然后執(zhí)行task.run()方法執(zhí)行具體任務(wù),執(zhí)行完之后會(huì)調(diào)用afterExecute()方法,用以處理任務(wù)執(zhí)行完畢之后的工作,也是一個(gè)擴(kuò)展接口代碼。
⑤更新當(dāng)前線程池完成的任務(wù)數(shù),并釋放鎖
4.3、執(zhí)行清理工作的方法processWorkerExit下面是方法processWorkerExit的源碼,在下面的代碼中
①首先(1-1)處統(tǒng)計(jì)線程池完成的任務(wù)個(gè)數(shù),并且在此之前獲取全局鎖,然后更新當(dāng)前的全局計(jì)數(shù)器,然后從工作線程集合中移除當(dāng)前工作線程,完成清理工作。
②代碼(1-2)調(diào)用了tryTerminate方法,在該方法中,判斷了當(dāng)前線程池狀態(tài)是SHUTDOWN并且隊(duì)列不為空或者當(dāng)前線程池狀態(tài)為STOP并且當(dāng)前線程池中沒(méi)有活動(dòng)線程,則置線程池狀態(tài)為TERMINATED。如果設(shè)置稱為了TERMINATED狀態(tài),還需要調(diào)用全局條件變量termination的signalAll方法喚醒所有因?yàn)檎{(diào)用線程池的awaitTermination方法而被阻塞住的線程,使得線程池中的所有線程都停止,從而使得線程池為TERMINATED狀態(tài)。
③代碼(1-3)處判斷當(dāng)前線程池中的線程個(gè)數(shù)是否小于核心線程數(shù),如果是,需要新增一個(gè)線程保證有足夠的線程可以執(zhí)行任務(wù)隊(duì)列中的任務(wù)或者提交的任務(wù)。
private void processWorkerExit(Worker w, boolean completedAbruptly) { /* *completedAbruptly:是由runWorker傳過(guò)來(lái)的參數(shù),表示是否突然完成的意思 *當(dāng)在就是在執(zhí)行任務(wù)過(guò)程當(dāng)中出現(xiàn)異常,就會(huì)突然完成,傳true * *如果是突然完成,需要通過(guò)CAS操作,更新workerCount(-1操作) *不是突然完成,則不需要-1,因?yàn)間etTask方法當(dāng)中已經(jīng)-1(getTask方法中執(zhí)行了decrementWorkerCount()方法) */ if (completedAbruptly)decrementWorkerCount(); //(1-1)在統(tǒng)計(jì)完成任務(wù)個(gè)數(shù)之前加上全局鎖,然后統(tǒng)計(jì)線程池中完成的任務(wù)個(gè)數(shù)并更新全局計(jì)數(shù)器,并從工作集中刪除當(dāng)前worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //獲得全局鎖 try {completedTaskCount += w.completedTasks; //更新已完成的任務(wù)數(shù)量workers.remove(w); //將完成該任務(wù)的線程worker從工作線程集合中移除 } finally {mainLock.unlock(); //釋放鎖 } /**(1-2) * 這一個(gè)方法調(diào)用完成了下面的事情: * 判斷如果當(dāng)前線程池狀態(tài)是SHUTDOWN并且工作隊(duì)列為空, * 或者當(dāng)前線程池狀態(tài)是STOP并且當(dāng)前線程池里面沒(méi)有活動(dòng)線程, * 則設(shè)置當(dāng)前線程池狀態(tài)為TERMINATED,如果設(shè)置成了TERMINATED狀態(tài), * 還需要調(diào)用條件變量termination的signAll方法激活所有因?yàn)檎{(diào)用線程池的awaitTermination方法而被阻塞的線程 */ tryTerminate(); //(1-3)如果當(dāng)前線程池中線程數(shù)小于核心線程,則增加核心線程數(shù) int c = ctl.get(); //判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN) if (runStateLessThan(c, STOP)) {//如果任務(wù)忽然完成,執(zhí)行后續(xù)的代碼if (!completedAbruptly) { //allowCoreThreadTimeOut表示是否允許核心線程超時(shí),默認(rèn)為false //min這里當(dāng)默認(rèn)為allowCoreThreadTimeOut默認(rèn)為false的時(shí)候,min置為coorPoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //這里說(shuō)明:如果允許核心線程超時(shí),那么allowCoreThreadTimeOut可為true,那么min值為0,不需要維護(hù)核心線程了 //如果min為0并且任務(wù)隊(duì)列不為空 if (min == 0 && ! workQueue.isEmpty())min = 1; //這里表示如果min為0,且隊(duì)列不為空,那么至少需要一個(gè)核心線程存活來(lái)保證任務(wù)的執(zhí)行 //如果工作線程數(shù)大于min,表示當(dāng)前線程數(shù)滿足,直接返回 if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false); }}
在tryTerminate方法中,我們簡(jiǎn)單說(shuō)明了該方法的作用,下面是該方法的源碼,可以看出源碼實(shí)現(xiàn)上和上面所總結(jié)的功能是差不多的
final void tryTerminate() { for (;;) {//獲取線程池狀態(tài)int c = ctl.get();//如果線程池狀態(tài)為RUNNING//或者狀態(tài)大于TIDYING//或者狀態(tài)==SHUTDOWN并未任務(wù)隊(duì)列不為空//直接返回,不能調(diào)用terminated方法if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return;//如果線程池中工作線程數(shù)不為0,需要中斷線程if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return;}//獲得線程池的全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try { //通過(guò)CAS操作,將線程池狀態(tài)設(shè)置為TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; }try { //調(diào)用terminated方法 terminated();} finally { //最終將線程狀態(tài)設(shè)置為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //調(diào)用條件變量termination的signaAll方法喚醒所有因?yàn)? //調(diào)用線程池的awaitTermination方法而被阻塞的線程 //private final Condition termination = mainLock.newCondition(); termination.signalAll();}return; }} finally { mainLock.unlock();}// else retry on failed CAS }}五、補(bǔ)充(shutdown、shutdownNow、awaitTermination方法)5.1、shutdown操作
我們?cè)谑褂镁€程池的時(shí)候知道,調(diào)用shutdown方法之后線程池就不會(huì)再接受新的任務(wù)了,但是任務(wù)隊(duì)列中的任務(wù)還是需要執(zhí)行完的。調(diào)用該方法會(huì)立刻返回,并不是等到線程池的任務(wù)隊(duì)列中的所有任務(wù)執(zhí)行完畢在返回的。
public void shutdown() { //獲得線程池的全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {//進(jìn)行權(quán)限檢查checkShutdownAccess();//設(shè)置當(dāng)前線程池的狀態(tài)的SHUTDOWN,如果線程池狀態(tài)已經(jīng)是該狀態(tài)就會(huì)直接返回,下面我們會(huì)分析這個(gè)方法的源碼advanceRunState(SHUTDOWN);//設(shè)置中斷 標(biāo)志interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor } finally {mainLock.unlock(); } //嘗試將狀態(tài)變?yōu)門ERMINATED,上面已經(jīng)分析過(guò)該方法的源碼 tryTerminate();}
該方法的源碼比較簡(jiǎn)短,首先檢查了安全管理器,是查看當(dāng)前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有權(quán)限還需要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒(méi)有權(quán)限將會(huì)拋出SecurityException異常或者空指針異常。下面我們查看一下advanceRunState 方法的源碼。
private void advanceRunState(int targetState) { for (;;) {//下面的方法執(zhí)行的就是://首先獲取線程的ctl值,然后判斷當(dāng)前線程池的狀態(tài)如果已經(jīng)是SHUTDOWN,那么if條件第一個(gè)為真就直接返回//如果不是SHUTDOWN狀態(tài),就需要CAS的設(shè)置當(dāng)前狀態(tài)為SHUTDOWNint c = ctl.get();if (runStateAtLeast(c, targetState) || //private static int ctlOf(int rs, int wc) { return rs | wc; } ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; }}
我們可以看出advanceRunState 方法實(shí)際上就是判斷當(dāng)前線程池的狀態(tài)是否為SHUTDWON,如果是那么就返回,否則就需要設(shè)置當(dāng)前狀態(tài)為SHUTDOWN。
我們?cè)賮?lái)看看shutdown方法中調(diào)用線程中斷的方法interruptIdleWorkers源碼
private void interruptIdleWorkers() { interruptIdleWorkers(false);}private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {for (Worker w : workers) { Thread t = w.thread; //如果工作線程沒(méi)有被中斷,并且沒(méi)有正在運(yùn)行設(shè)置中斷標(biāo)志 if (!t.isInterrupted() && w.tryLock()) {try { //需要中斷當(dāng)前線程 t.interrupt();} catch (SecurityException ignore) {} finally { w.unlock();} } if (onlyOne)break;} } finally {mainLock.unlock(); }}
上面的代碼中,需要設(shè)置所有空閑線程的中斷標(biāo)志。首先獲取線程池的全局鎖,同時(shí)只有一個(gè)線程可以調(diào)用shutdown方法設(shè)置中斷標(biāo)志。然后嘗試獲取工作線程Worker自己的鎖,獲取成功則可以設(shè)置中斷標(biāo)志(這是由于正在執(zhí)行任務(wù)的線程需要獲取自己的鎖,并且不可重入,所以正在執(zhí)行的任務(wù)沒(méi)有被中斷),這里要中斷的那些線程是阻塞到getTask()方法并嘗試從任務(wù)隊(duì)列中獲取任務(wù)的線程即空閑線程。
5.2、shutdownNow操作在使用線程池的時(shí)候,如果我們調(diào)用了shutdownNow方法,線程池不僅不會(huì)再接受新的任務(wù),還會(huì)將任務(wù)隊(duì)列中的任務(wù)丟棄,正在執(zhí)行的任務(wù)也會(huì)被中斷,然后立刻返回該方法,不會(huì)等待激活的任務(wù)完成,返回值為當(dāng)前任務(wù)隊(duì)列中被丟棄的任務(wù)列表
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {checkShutdownAccess(); //還是進(jìn)行權(quán)限檢查advanceRunState(STOP); //設(shè)置線程池狀態(tài)臺(tái)STOPinterruptWorkers(); //中斷所有線程tasks = drainQueue(); //將任務(wù)隊(duì)列中的任務(wù)移動(dòng)到task中 } finally {mainLock.unlock(); } tryTerminate(); return tasks; //返回tasks}
從上面的代碼中,我們可以可以發(fā)現(xiàn),shutdownNow方法也是首先需要檢查調(diào)用該方法的線程的權(quán)限,之后不同于shutdown方法之處在于需要即刻設(shè)置當(dāng)前線程池狀態(tài)為STOP,然后中斷所有線程(空閑線程+正在執(zhí)行任務(wù)的線程),移除任務(wù)隊(duì)列中的任務(wù)
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {for (Worker w : workers) //不需要判斷當(dāng)前線程是否在執(zhí)行任務(wù)(即不需要調(diào)用w.tryLock方法),中斷所有線程 w.interruptIfStarted(); } finally {mainLock.unlock(); }}5.3、awaitTermination操作
當(dāng)線程調(diào)用該方法之后,會(huì)阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會(huì)返回,或者等到超時(shí)時(shí)間到之后會(huì)返回,下面是該方法的源碼。
//調(diào)用該方法之后,會(huì)阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會(huì)返回,或者等到超時(shí)時(shí)間到之后會(huì)返回public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {//阻塞當(dāng)前線程,(獲取了Worker自己的鎖),那么當(dāng)前線程就不會(huì)再執(zhí)行任務(wù)(因?yàn)楂@取不到鎖)for (;;) { //當(dāng)前線程池狀態(tài)為TERMINATED狀態(tài),會(huì)返回true if (runStateAtLeast(ctl.get(), TERMINATED))return true; //超時(shí)時(shí)間到返回false if (nanos <= 0)return false; nanos = termination.awaitNanos(nanos);} } finally {mainLock.unlock(); }}
在上面的代碼中,調(diào)用者線程需要首先獲取線程Worker自己的獨(dú)占鎖,然后在循環(huán)判斷當(dāng)前線程池是否已經(jīng)是TERMINATED狀態(tài),如果是則直接返回,否則說(shuō)明當(dāng)前線程池中還有線程正在執(zhí)行任務(wù),這時(shí)候需要查看當(dāng)前設(shè)置的超時(shí)時(shí)間是否小于0,小于0說(shuō)明不需要等待就直接返回,如果大于0就需要調(diào)用條件變量termination的awaitNanos方法等待設(shè)置的時(shí)間,并在這段時(shí)間之內(nèi)等待線程池的狀態(tài)變?yōu)門ERMINATED。
我們?cè)谇懊嬲f(shuō)到清理線程池的方法processWorkerExit的時(shí)候,需要調(diào)用tryTerminated方法,在該方法中會(huì)查看當(dāng)前線程池狀態(tài)是否為TERMINATED,如果是該狀態(tài)也會(huì)調(diào)用termination.signalAll()方法喚醒所有線程池中因調(diào)用awaitTermination而被阻塞住的線程。
如果是設(shè)置了超時(shí)時(shí)間,那么termination的awaitNanos方法也會(huì)返回,這時(shí)候需要重新檢查線程池狀態(tài)是否為TERMINATED,如果是則返回,不是就繼續(xù)阻塞自己。
以上就是Java并發(fā)包中線程池ThreadPoolExecutor原理探究的詳細(xì)內(nèi)容,更多關(guān)于Java 并發(fā)包 線程池 ThreadPoolExecutor的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. 使用css實(shí)現(xiàn)全兼容tooltip提示框2. 前端html+css實(shí)現(xiàn)動(dòng)態(tài)生日快樂(lè)代碼3. CSS3實(shí)例分享之多重背景的實(shí)現(xiàn)(Multiple backgrounds)4. Vue3使用JSX的方法實(shí)例(筆記自用)5. JavaScript數(shù)據(jù)類型對(duì)函數(shù)式編程的影響示例解析6. 詳解CSS偽元素的妙用單標(biāo)簽之美7. Vue3獲取DOM節(jié)點(diǎn)的3種方式實(shí)例8. 利用CSS3新特性創(chuàng)建透明邊框三角9. vue實(shí)現(xiàn)將自己網(wǎng)站(h5鏈接)分享到微信中形成小卡片的超詳細(xì)教程10. 不要在HTML中濫用div
