Java多線程并發(fā)執(zhí)行demo代碼實(shí)例
主類:MultiThread,執(zhí)行并發(fā)類
package java8test;import java.util.ArrayList;import java.util.List;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingQueue;/** * @param <H> 為被處理的數(shù)據(jù)類型 * @param <T>返回?cái)?shù)據(jù)類型 * 知識(shí)點(diǎn)1:X,T為泛型,為什么要用泛型,泛型和Object的區(qū)別請(qǐng)看:https://www.cnblogs.com/xiaoxiong2015/p/12705815.html */public abstract class MultiThread<X, T> { public static int i = 0; // 知識(shí)點(diǎn)2:線程池:https://www.cnblogs.com/xiaoxiong2015/p/12706153.html private final ExecutorService exec; // 線程池 // 知識(shí)點(diǎn)3:@author Doung Lea 隊(duì)列:https://www.cnblogs.com/xiaoxiong2015/p/12825636.html private final BlockingQueue<Future<T>> queue = new LinkedBlockingQueue<>(); // 知識(shí)點(diǎn)4:計(jì)數(shù)器,還是并發(fā)包大神 @author Doug Lea 編寫。是一個(gè)原子安全的計(jì)數(shù)器,可以利用它實(shí)現(xiàn)發(fā)令槍 private final CountDownLatch startLock = new CountDownLatch(1); // 啟動(dòng)門,當(dāng)所有線程就緒時(shí)調(diào)用countDown private final CountDownLatch endLock; // 結(jié)束門 private final List<X> listData;// 被處理的數(shù)據(jù) /** * @param list list.size()為多少個(gè)線程處理,list里面的H為被處理的數(shù)據(jù) */ public MultiThread(List<X> list) { if (list != null && list.size() > 0) { this.listData = list; exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 創(chuàng)建線程池,線程池共有nThread個(gè)線程 endLock = new CountDownLatch(list.size()); // 設(shè)置結(jié)束門計(jì)數(shù)器,當(dāng)一個(gè)線程結(jié)束時(shí)調(diào)用countDown } else { listData = null; exec = null; endLock = null; } } /** * * @return 獲取每個(gè)線程處理結(jié)速的數(shù)組 * @throws InterruptedException * @throws ExecutionException */ public List<T> getResult() throws InterruptedException, ExecutionException { List<T> resultList = new ArrayList<>(); if (listData != null && listData.size() > 0) { int nThread = listData.size(); // 線程數(shù)量 for (int i = 0; i < nThread; i++) {X data = listData.get(i);Future<T> future = exec.submit(new Task(i, data) { @Override public T execute(int currentThread, X data) { return outExecute(currentThread, data); }}); // 將任務(wù)提交到線程池queue.add(future); // 將Future實(shí)例添加至隊(duì)列 } startLock.countDown(); // 所有任務(wù)添加完畢,啟動(dòng)門計(jì)數(shù)器減1,這時(shí)計(jì)數(shù)器為0,所有添加的任務(wù)開始執(zhí)行 endLock.await(); // 主線程阻塞,直到所有線程執(zhí)行完成 for (Future<T> future : queue) {resultList.add(future.get()); } exec.shutdown(); // 關(guān)閉線程池 } return resultList; } /** * 每一個(gè)線程執(zhí)行的功能,需要調(diào)用者來(lái)實(shí)現(xiàn) * @param currentThread 線程號(hào) * @param data 每個(gè)線程被處理的數(shù)據(jù) * @return T返回對(duì)象 */ public abstract T outExecute(int currentThread, X data); /** * 線程類 */ private abstract class Task implements Callable<T> { private int currentThread;// 當(dāng)前線程號(hào) private X data; public Task(int currentThread, X data) { this.currentThread = currentThread; this.data = data; } @Override public T call() throws Exception { // startLock.await(); // 線程啟動(dòng)后調(diào)用await,當(dāng)前線程阻塞,只有啟動(dòng)門計(jì)數(shù)器為0時(shí)當(dāng)前線程才會(huì)往下執(zhí)行 T t = null; try {t = execute(currentThread, data); } finally {endLock.countDown(); // 線程執(zhí)行完畢,結(jié)束門計(jì)數(shù)器減1 } return t; } /** * 每一個(gè)線程執(zhí)行的功能 * @param currentThread 線程號(hào) * @param data 每個(gè)線程被處理的數(shù)據(jù) * @return T返回對(duì)象 */ public abstract T execute(int currentThread, X data); }}
結(jié)果類:ResultVO,保存返回結(jié)果,根據(jù)實(shí)際情況替換成自己的
package java8test;public class ResultVo { int i; public ResultVo(int i) { this.i = i; } public ResultVo() { // TODO Auto-generated constructor stub }}
參數(shù)類:ParamVO,傳入?yún)?shù)類,根據(jù)實(shí)際情況替換成自己的
package java8test;public class ParamVo { private int i; ParamVo(int i) { this.i = i; } public int getI() { return i; } @Override public String toString() { return String.valueOf(i) + ' ' + hashCode(); }}
測(cè)試類:new兩個(gè)MultiThread,可以看到MultiThread這個(gè)類不存在線程安全問題。
package java8test;import java.util.ArrayList;import java.util.List;public class Test { public static void main(String[] args) { try { List<ParamVo> splitList = new ArrayList<ParamVo>(); for (int i = 0; i < 100; i++) {splitList.add(new ParamVo(i)); } List<ParamVo> splitList1 = new ArrayList<ParamVo>(); for (int i = 200; i < 300; i++) {splitList1.add(new ParamVo(i)); } MultiThread<ParamVo, ResultVo> multiThread = new MultiThread<ParamVo, ResultVo>(splitList) {@Overridepublic ResultVo outExecute(int currentThread, ParamVo data) { System.out.println('當(dāng)前線程名稱:' + Thread.currentThread().getName() + '當(dāng)前線程號(hào)=' + currentThread + ' data=' + data); i--; return new ResultVo(data.getI());} }; MultiThread<ParamVo, ResultVo> multiThread1 = new MultiThread<ParamVo, ResultVo>(splitList1) {@Overridepublic ResultVo outExecute(int currentThread, ParamVo data) { System.out.println('當(dāng)前線程名稱:' + Thread.currentThread().getName() + '當(dāng)前線程號(hào)=' + currentThread + ' data=' + data); i--; return new ResultVo(data.getI());} }; List<ResultVo> list = multiThread.getResult(); List<ResultVo> list1 = multiThread1.getResult(); // 獲取每一批次處理結(jié)果 System.out.println('獲取處理結(jié)果........................'); for (ResultVo vo : list) {System.out.println(vo.i); } System.out.println('獲取1處理結(jié)果........................'); for (ResultVo vo : list1) {System.out.println(vo.i); } } catch (Exception e) { e.printStackTrace(); } }}
這個(gè)類也用在了生產(chǎn)當(dāng)中,用來(lái)并發(fā)插入數(shù)據(jù)。但是事務(wù)不能被管控,需要自己保證最終事務(wù)一致。需要注意。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。
相關(guān)文章:
1. python selenium 獲取接口數(shù)據(jù)的實(shí)現(xiàn)2. java 優(yōu)雅關(guān)閉線程池的方案3. 詳解idea中web.xml默認(rèn)版本問題解決4. asp知識(shí)整理筆記4(問答模式)5. jsp實(shí)現(xiàn)textarea中的文字保存換行空格存到數(shù)據(jù)庫(kù)的方法6. ASP實(shí)現(xiàn)加法驗(yàn)證碼7. jsp EL表達(dá)式詳解8. Python matplotlib 繪制雙Y軸曲線圖的示例代碼9. 利用ajax+php實(shí)現(xiàn)商品價(jià)格計(jì)算10. JSP頁(yè)面實(shí)現(xiàn)驗(yàn)證碼校驗(yàn)功能
