本文目錄:
我們這里的隊列都指線程池使用的阻塞隊列 BlockingQueue 的實現(xiàn)。
什么是有界隊列?就是有固定大小的隊列。比如設(shè)定了固定大小的 LinkedBlockingQueue,又或者大小為 0,只是在生產(chǎn)者和消費者中做中轉(zhuǎn)用的 SynchronousQueue。
什么是無界隊列?指的是沒有設(shè)置固定大小的隊列。這些隊列的特點是可以直接入列,直到溢出。當然現(xiàn)實幾乎不會有到這么大的容量(超過 Integer.MAX_VALUE),所以從使用者的體驗上,就相當于 “無界”。比如沒有設(shè)定固定大小的 LinkedBlockingQueue。
所以無界隊列的特點就是可以一直入列,不存在隊列滿負荷的現(xiàn)象。
這個特性,在我們自定義線程池的使用中非常容易出錯。而出錯的根本原因是對線程池內(nèi)部原理的不了解。
比如有這么一個案例,我們使用了無界隊列創(chuàng)建了這樣一個線程池:
ExecutorService executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
配置的參數(shù)如下:
然后對這個線程池我們提出一個問題:使用過程中,是否會達到最大線程數(shù) 4?
我們寫了個 Demo 驗證一下,設(shè)定有 10 個任務(wù),每個任務(wù)執(zhí)行 10s。
任務(wù)的執(zhí)行代碼如下,用 Thread.sleep 操作模擬執(zhí)行任務(wù)的阻塞耗時。
/** * @author lidiqing * @since 17/9/17. */public class BlockRunnable implements Runnable { private final String mName; public BlockRunnable(String name) { mName = name; } public void run() { System.out.println(String.format("[%s] %s 執(zhí)行", Thread.currentThread().getName(), mName)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }}
然后在 main 方法中把這 10 個任務(wù)扔進剛剛設(shè)計好的線程池中:
public static void main(String[] args) { ExecutorService executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); for (int i = 0; i < 10; i++) { executor.execute(new BlockRunnable(String.valueOf(i))); } }
結(jié)果輸出如下:
[pool-1-thread-2] 1 執(zhí)行[pool-1-thread-1] 0 執(zhí)行[pool-1-thread-2] 2 執(zhí)行[pool-1-thread-1] 3 執(zhí)行[pool-1-thread-1] 5 執(zhí)行[pool-1-thread-2] 4 執(zhí)行[pool-1-thread-2] 7 執(zhí)行[pool-1-thread-1] 6 執(zhí)行[pool-1-thread-1] 8 執(zhí)行[pool-1-thread-2] 9 執(zhí)行
發(fā)現(xiàn)了什么問題?這里最多出現(xiàn)兩個線程。當放開到更多的任務(wù)時,也依然是這樣。
我們回到線程池 ThreadPoolExecutor 的 execute 方法來找原因。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
上面代碼的核心就是任務(wù)進入等待隊列 workQueue 的時機。答案就是,執(zhí)行 execute 方法時,如果發(fā)現(xiàn)核心線程數(shù)已滿,是會先執(zhí)行 workQueue.offer(command)
來入列。
也就是 當核心線程數(shù)滿了后,任務(wù)優(yōu)先進入等待隊列。如果等待隊列也滿了后,才會去創(chuàng)建新的非核心線程 。
所以我們上面設(shè)計的線程池,使用了無界隊列,會直接導(dǎo)致最大線程數(shù)的配置失效。
可以用一張圖來展示整個 execute 階段的過程:
所以上面的線程池,實際使用的線程數(shù)的最大值始終是 corePoolSize
,即便設(shè)置了 maximumPoolSize
也沒有生效。 要用上 maximumPoolSize
,允許在核心線程滿負荷下,繼續(xù)創(chuàng)建新線程來工作 ,就需要選用有界任務(wù)隊列。可以給 LinkedBlockingQueue 設(shè)置容量,比如 new LinkedBlockingQueue(128)
,也可以換成 SynchronousQueue。
舉個例子,用來做異步任務(wù)的 AsyncTask 的內(nèi)置并發(fā)執(zhí)行器的線程池設(shè)計如下:
public abstract class AsyncTask<Params, Progress, Result> { private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); // We want at least 2 threads and at most 4 threads in the core pool, // preferring to have 1 less than the CPU count to avoid saturating // the CPU with background work private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4)); private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE_SECONDS = 30; private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128); /** * An {@link Executor} that can be used to execute tasks in parallel. */ public static final Executor THREAD_POOL_EXECUTOR; static { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); threadPoolExecutor.allowCoreThreadTimeOut(true); THREAD_POOL_EXECUTOR = threadPoolExecutor; } ...}
我們可以看到,AsyncTask 的這個線程池設(shè)計,是希望在達到核心線程數(shù)之后,能夠繼續(xù)增加工作線程,最大達到 CPU_COUNT * 2 + 1
個線程,所以使用了有界隊列,限制了任務(wù)隊列最大數(shù)量為 128 個。
所以使用 AsyncTask 的并發(fā)線程池的時候要注意,不適宜短時間同時大量觸發(fā)大量任務(wù)的場景。
因為當核心線程、任務(wù)隊列、非核心線程全部滿負荷工作的情況下,下一個進來的任務(wù)會觸發(fā) ThreaPoolExecutor 的 reject
操作,默認會使用 AbortPolicy
策略,拋出 RejectedExecutionException
異常。