詳細分析JAVA 線程池
系統(tǒng)啟動一個新線程的成本是比較高的,因為它涉及與操作系統(tǒng)交互。在這種情形下,使用線程池可以很好地提高性能,尤其是當程序中需要創(chuàng)建大量生存期很短暫的線程時,更應(yīng)該考慮使用線程池。
與數(shù)據(jù)庫連接池類似的是,線程池在系統(tǒng)啟動時即創(chuàng)建大量空閑的線程,程序?qū)⒁粋€ Runnable 對象或 Callable 對象傳給線程池,線程池就會啟動一個線程來執(zhí)行它們的 run() 或 call() 方法,當 run() 或 call() 方法執(zhí)行結(jié)束后,該線程并不會死亡,而是再次返回線程池成為空閑狀態(tài),等待執(zhí)行下一個 Runnable 對象的 run() 或 call() 方法。
除此之外,使用線程池可以有效地控制系統(tǒng)中并發(fā)線程的數(shù)量,當系統(tǒng)中包含大量并發(fā)線程時,會導(dǎo)致系統(tǒng)性能劇烈下降,甚至導(dǎo)致 JVM 崩潰,而線程池的最大線程數(shù)參數(shù)可以控制系統(tǒng)中并發(fā)線程數(shù)不超過此數(shù)。
Java 8 改進的線程池
在 Java 5 以前,開發(fā)者必須手動實現(xiàn)自己的線程池;從 Java 5 開始, Java 內(nèi)建支持線程池。 Java 5 新增了一個 Executors 工廠類來產(chǎn)生線程池,該工廠類包含如下幾個靜態(tài)工廠方法來創(chuàng)建線程池。
newCachedThreadPool():創(chuàng)建一個具有緩存功能的線程池,系統(tǒng)根據(jù)需要創(chuàng)建線程,這些線程將會被緩存在線程池中。 newFixedThreadPool(int nThreads):創(chuàng)建一個可重用的、具有固定線程數(shù)的線程池。 newSingleThreadExecutor():創(chuàng)建一個只有單線程的線程池,它相當于調(diào)用 newFixedThreadPool() 方法時傳入?yún)?shù)為1。 newScheduledThreadPool(int corePoolSize):創(chuàng)建具有指定線程數(shù)的線程池,它可以在指定延遲后執(zhí)行線程任務(wù)。 corePoolSize 指池中所保存的線程數(shù),即使線程是空閑的也被保存在線程池內(nèi)。 newSingleThreadScheduledExecutor():創(chuàng)建只有一個線程的線程池,它可以在指定延遲后執(zhí)行線程任務(wù)。 ExecutorService newWorkStealingPool(int parallelism):創(chuàng)建持有足夠的線程的線程池來支持給定的并行級別,該方法還會使用多個隊列來減少競爭。 ExecutorService newWorkStealingPool():該方法是前一個方法的簡化版本。如果當前機器有 4 個CPU, 則目標并行級別被設(shè)置為 4,也就是相當于為前一個方法傳入 4 作為參數(shù)。上面7個方法中的前三個方法返回一個 ExecutorService 對象,該對象代表一個線程池,它可以執(zhí)行 Runnable 對象或 Callable 對象所代表的線程;而中間兩個方法返回一個 ScheduledExecutorService 線程池,它是 ExecutorService 的子類,它可以在指定延遲后執(zhí)行線程任務(wù);最后兩個方法則是 Java 8 新增的,這兩個方法可充分利用多 CPU 并行的能力。這兩個方法生成的 work stealing 池,都相當于后臺線程池,如果所有的前臺線程都死亡了,work stealing 池中的線程會自動死亡。
由于目前計算機硬件的發(fā)展日新月異,即使普通用戶使用的電腦通常也都是多核 CPU,因此 Java 8 在線程支持上也增加了利用多 CPU 并行的能力,這樣可以更好地發(fā)揮底層硬件的性能。
ExecutorService 代表盡快執(zhí)行線程的線程池(只要線程池中有空閑線程,就立即執(zhí)行線程任務(wù)),程序只要將一個 Runnable 對象或 Callable 對象(代表線程任務(wù))提交給該線程池,該線程池就會盡快執(zhí)行該任務(wù)。 ExecutorService 里提供了如下三個方法。
Future <?> submit(Runnable task):將一個 Runnable 對象提交給指定的線程池,線程池將在有空閑線程時執(zhí)行 Runnable 對象代表的任務(wù)。其中 Future 對象代表 Runnable 任務(wù)的返回值,但 run() 方法沒有返回值,所以 Future 對象將在 run() 方法執(zhí)行結(jié)束后返回 null 。但可以調(diào)用 Future 的 isDone()、 isCancelled() 方法來獲得 Runnable 對象的執(zhí)行狀態(tài)。 <T> Future <T> submit(Runnable task, T result):將一個 Runnable 對象提交給指定的線程池,線程池將在有空閑線程時執(zhí)行 Runnable 對象代表的任務(wù)。其中 result 顯式指定線程執(zhí)行結(jié)束后的返回值,所以 Future 對象將在 run() 方法執(zhí)行結(jié)束后返回 result 。 <T> Future <T> submit(Callable <T> task):將一個 Callable 對象提交給指定的線程池,線程池將在有空閑線程時執(zhí)行 Callable 對象代表的任務(wù)。其中 Future 代表 Callable 對象里 call() 方法的返回值。ScheduledExecutorService 代表可在指定延遲后或周期性地執(zhí)行線程任務(wù)的線程池,它提供了如下4個方法。
ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit):指定 callable 任務(wù)將在 delay 延遲后執(zhí)行。 ScheduledFuture<?> schedule(Runnable command , long delay , TimeUnit unit):指定 command 任務(wù)將在 delay 延遲后執(zhí)行。 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period , TimeUnit unit) : 指定 command 任務(wù)將在 delay 延遲后執(zhí)行,而且以設(shè)定頻率重復(fù)執(zhí)行。也就是說,在 initialDelay 后開始執(zhí)行,依次在 initialDelay + period 、 initialDelay +2* period …處重復(fù)執(zhí)行,依此類推。 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作,隨后在每一次執(zhí)行終止和下一次執(zhí)行開始之間都存在給定的延遲。如果任務(wù)在任一次執(zhí)行時遇到異常,就會取消后續(xù)執(zhí)行;否則,只能通過程序來顯式取消或終止該任務(wù)。用完一個線程池后,應(yīng)該調(diào)用該線程池的 shutdown() 方法,該方法將啟動線程池的關(guān)閉序列,調(diào)用 shutdown() 方法后的線程池不再接收新任務(wù),但會將以前所有已提交任務(wù)執(zhí)行完成。當線程池中的所有任務(wù)都執(zhí)行完成后,池中的所有線程都會死亡;另外也可以調(diào)用線程池的 shutdownNow() 方法來關(guān)閉線程池,該方法試圖停止所有正在執(zhí)行的活動任務(wù),暫停處理正在等待的任務(wù),并返回等待執(zhí)行的任務(wù)列表。
使用線程池來執(zhí)行線程任務(wù)的步驟如下。
調(diào)用 Executors 類的靜態(tài)工廠方法創(chuàng)建一個 ExecutorService 對象,該對象代表一個線程池。 創(chuàng)建 Runnable 實現(xiàn)類或 Callable 實現(xiàn)類的實例,作為線程執(zhí)行任務(wù)。 調(diào)用 ExecutorService 對象的 submit() 方法來提交 Runnable 實例或 Callable 實例。 當不想提交任何任務(wù)時,調(diào)用 ExecutorService 對象的 shutdown() 方法來關(guān)閉線程池。下面程序使用線程池來執(zhí)行指定 Runnable 對象所代表的任務(wù)。
//實現(xiàn)Runnable接口來定義一個簡單的class TestThread implements Runnable{ public void run(){ for (int i = 0; i < 100 ; i++ ){ System.out.println(Thread.currentThread().getName() + '的i值為:' + i); } }}public class ThreadPoolTest{ public static void main(String[] args) { //創(chuàng)建一個具有固定線程數(shù)(6)的線程池 ExecutorService pool = Executors.newFixedThreadPool(6); //向線程池中提交2個線程 pool.submit(new TestThread()); pool.submit(new TestThread()); //關(guān)閉線程池 pool.shutdown(); }}
上面程序中創(chuàng)建 Runnable 實現(xiàn)類與最開始創(chuàng)建線程池并沒有太大差別,創(chuàng)建了 Runnable 實現(xiàn)類之后程序沒有直接創(chuàng)建線程、啟動線程來執(zhí)行該 Runnable 任務(wù),而是通過線程池來執(zhí)行該任務(wù),使用線程池來執(zhí)行 Runnable 任務(wù)的代碼如程序中粗體字代碼所示。運行上面程序,將看到兩個線程交替執(zhí)行的效果,如下圖所示。
Java 8 增強的 ForkJoinPool
現(xiàn)在計算機大多已向多 CPU 方向發(fā)展,即使普通 PC ,甚至小型智能設(shè)備(如手機)、多核處理器也已被廣泛應(yīng)用。在未來的日子里,處理器的核心數(shù)將會發(fā)展到更多。
雖然硬件上的多核 CPU 已經(jīng)十分成熟,但很多應(yīng)用程序并未為這種多核 CPU 做好準備,因此并不能很好地利用多核 CPU 的性能優(yōu)勢。
為了充分利用多 CPU 、多核 CPU 的性能優(yōu)勢,計算機軟件系統(tǒng)應(yīng)該可以充分“挖掘”每個 CPU 的計算能力,絕不能讓某個 CPU 處于“空閑”狀態(tài)。為了充分利用多 CPU 、多核 CPU 的優(yōu)勢,可以考慮把一個任務(wù)拆分成多個“小任務(wù)”,把多個“小任務(wù)”放到多個處理器核心上并行執(zhí)行;當多個“小任務(wù)”執(zhí)行完成之后,再將這些執(zhí)行結(jié)果合并起來即可。
Java 7 提供了 ForkJoinPool 來支持將一個任務(wù)拆分成多個“小任務(wù)”并行計算,再把多個“小任務(wù)”的結(jié)果合并成總的計算結(jié)果。 ForkJoinPool 是 ExecutorService 的實現(xiàn)類,因此是一種特殊的線程池。ForkJoinPool 提供了如下兩個常用的構(gòu)造器。
ForkJoinPool(int parallelism):創(chuàng)建一個包含 parallelism 個并行線程的 ForkJoinPool 。 ForkJoinPool():以 Runtime.availableProcessors() 方法的返回值作為 parallelism 參數(shù)來創(chuàng)建 ForkJoinPoolJava 8 進一步擴展了 ForkJoinPool 的功能 ,Java 8 為 ForkJoinPool 增加了通用池功能。 ForkJoinPool 類通過如下兩個靜態(tài)方法提供通用池功能。
ForkJoinPool commonPool():該方法返回一個通用池,通用池的運行狀態(tài)不會受 shutdown() 或 shutdownNow() 方法的影響。當然,如果程序直接執(zhí)行 System.exit(0); 來終止虛擬機,通用池以及通用池中正在執(zhí)行的任務(wù)都會被自動終止。 int getCommonPoolParallelism():該方法返回通用池的并行級別。創(chuàng)建了 ForkJoinPool 實例之后,就可調(diào)用 ForkJoinPool 的 submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法來執(zhí)行指定任務(wù)了。其中 ForkJoinTask 代表一個可以并行、合并的任務(wù)。ForkJoinTask 是一個抽象類,它還有兩個抽象子類 : RecursiveAction 和 RecursiveTask 。其中 RecursiveTask 代表有返回值的任務(wù),而 RecursiveAction 代表沒有返回值的任務(wù)。
下面以執(zhí)行沒有返回值的“大任務(wù)”(簡單地打印0〜300的數(shù)值)為例,程序?qū)⒁粋€“大任務(wù)”拆分成多個“小任務(wù)”,并將任務(wù)交給 ForkJoinPool 來執(zhí)行。
import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveAction;import java.util.concurrent.TimeUnit;class PrintTask extends RecursiveAction{ // 每個“小任務(wù)”最多只打印50個數(shù) private static final int THRESHOLD = 50; private int start; private int end; // 打印從 start 到 end 的任務(wù) public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 當 end 與 start 之間的差小于 THRESHOLD 時,開始打印 if(end-start<THRESHOLD) { for(int i=start;i<end;i++) { System.out.println(Thread.currentThread().getName()+'的 i 值:'+i); } }else { // 當 end 與 start 之間的差大于 THRESHOLD 時,即要打印的數(shù)超過50個時 // 將大任務(wù)分解成兩個“小任務(wù)” int middle = (start+end)/2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); // 并行執(zhí)行兩個“小任務(wù)” left.fork(); right.fork(); } }}public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); // 提交可分解的 PrintTask 任務(wù) pool.submit(new PrintTask(0, 300)); pool.awaitTermination(2, TimeUnit.SECONDS); // 關(guān)閉線程池 pool.shutdown(); }}
上面程序中的粗體字代碼實現(xiàn)了對指定打印任務(wù)的分解,分解后的任務(wù)分別調(diào)用 fork() 方法開始并行執(zhí)行。運行上面程序,可以看到如下圖所示的結(jié)果。
從如上圖所示的執(zhí)行結(jié)果來看, ForkJoinPool 啟動了 4個線程來執(zhí)行這個打印任務(wù)——這是因為測試計算機的 CPU 是4核的。不僅如此,讀者可以看到程序雖然打印了 0〜299這300個數(shù)字,但并不是連續(xù)打印的,這是因為程序?qū)⑦@個打印任務(wù)進行了分解,分解后的任務(wù)會并行執(zhí)行,所以不會按順序從0打印到299。
上面定義的任務(wù)是一個沒有返回值的打印任務(wù),如果大任務(wù)是有返回值的任務(wù),則可以讓任務(wù)繼承 RecursiveTask<T>,其中泛型參數(shù) T 就代表了該任務(wù)的返回值類型。下面程序示范了使用 RecursiveTask 對一個長度為100的數(shù)組的元素值進行累加。
package com.jwen.demo4;import java.util.Random;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.Future;import java.util.concurrent.RecursiveTask;import java.util.function.Function;class CalTask extends RecursiveTask<Integer>{ // 每個“小任務(wù)”最多只累加20個數(shù) private static final int THRESHOLD = 20; private int arr[]; private int start; private int end; // 累加從 start 到 end 的數(shù)組元素 public CalTask(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 當 end 與 start 之間的差小于 THRESHOLD 時,開始進行實際累加 if(end-start<THRESHOLD) { for(int i=start;i<end;i++) { sum+=arr[i]; } return sum; }else { // 當 end 與 start 之間的差大于 THRESHOLD 時,即要累加的數(shù)超過20個時 // 將大任務(wù)分解成兩個“小任務(wù)” int middle = (start+end)/2; CalTask left = new CalTask(arr, start, middle); CalTask right = new CalTask(arr, middle, end); // 并行執(zhí)行兩個“小任務(wù)” left.fork(); right.fork(); // 把兩個“小任務(wù)”累加的結(jié)果合并起來 return left.join()+right.join(); // ① } }}public class Sum { public static void main(String[] args) throws Exception{ int[] arr = new int[100]; Random rand = new Random(); int total = 0; // 初始化100個數(shù)字元素 for(int i=0,len = arr.length;i<len;i++) { int tmp = rand.nextInt(20); // 對數(shù)組元素賦值,并將數(shù)組元素的值添加到 sum 總和中 total +=(arr[i]=tmp); } System.out.println(total); // 創(chuàng)建一個通用池 ForkJoinPool pool = ForkJoinPool.commonPool(); // 提交可分解的 CalTask 任務(wù) Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length)); System.out.println(future.get()); // 關(guān)閉線程池 pool.shutdown(); }}
上面程序與前一個程序基本相似,同樣是將任務(wù)進行了分解,并調(diào)用分解后的任務(wù)的 fork() 方法使它們并行執(zhí)行。與前一個程序不同的是,現(xiàn)在任務(wù)是帶返回值的,因此程序還在①號代碼處將兩個分解后的“小任務(wù)”的返回值進行了合并。
運行上面程序,將可以看到程序通過 CalTask 計算出來的總和,與初始化數(shù)組元素時統(tǒng)計出來的總和總是相等,這表明程序一切正常。
Java 的確是一門非常優(yōu)秀的編程語言,在多 CPU、多核 CPU 時代來到時,Java 語言的多線程已經(jīng)為多核 CPU 做好了準備。
以上就是詳細分析JAVA 線程池的詳細內(nèi)容,更多關(guān)于JAVA 線程池的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. Ajax實現(xiàn)表格中信息不刷新頁面進行更新數(shù)據(jù)2. 詳解CSS偽元素的妙用單標簽之美3. python 實用工具狀態(tài)機transitions4. PHP 面向?qū)ο蟪绦蛟O(shè)計之類屬性與類常量實現(xiàn)方法分析5. UDDI FAQs6. msxml3.dll 錯誤 800c0019 系統(tǒng)錯誤:-2146697191解決方法7. HTML <!DOCTYPE> 標簽8. Java Spring WEB應(yīng)用實例化如何實現(xiàn)9. CSS自定義滾動條樣式案例詳解10. 將properties文件的配置設(shè)置為整個Web應(yīng)用的全局變量實現(xiàn)方法
