執行器框架
ExecutorService
ExecutorService 是 Java 執行緒池的核心介面,提供管理和控制非同步任務執行的方法。
execute() vs submit()
| 方法 | 返回值 | 異常處理 |
|---|---|---|
execute(Runnable) | void | 無法獲取異常 |
submit(Callable) | Future | 可通過 Future 獲取異常 |
正確關閉執行緒池
正確關閉執行緒池的關鍵:
shutdown+awaitTerminationshutdownNow+awaitTermination
| 方法 | 行為 |
|---|---|
shutdown() | 拒絕新任務提交;待執行的任務不會取消;正在執行的任務繼續執行直到結束 |
shutdownNow() | 拒絕新任務;取消待執行的任務;嘗試中斷執行中的任務 |
awaitTermination() | 等待執行緒池關閉(直到 TERMINATED 狀態),有逾時時間參數 |
最佳實踐
ShutdownExample.java
executorService.shutdown();
try {
if (!executorService.awaitTermination(3000, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}流程說明:
- 呼叫
shutdown(),阻止新提交任務,並讓等待佇列中的任務執行完成 - 呼叫
awaitTermination(),確保等待佇列中的任務最多執行指定時間,以防止執行任務時間太長或被阻塞 - 在
awaitTermination()等待完畢後,若 ExecutorService 中還有任務沒執行完,則呼叫shutdownNow()強行終止 - 執行
awaitTermination()時所在的執行緒也有可能被中斷,因此需要 catch InterruptedException
ThreadPoolExecutor
Alibaba 開發規範
一般而言,不會使用
Executors 去創建,而是通過 ThreadPoolExecutor 自定義創建的方式,這樣處理可以更加明確 Thread Pool 運行規則,避免資源耗盡風險 (OOM)。Executors 返回的執行緒池物件的缺點:
| 類型 | 問題 |
|---|---|
| FixedThreadPool / SingleThreadPool | 允許 Request Queue 長度為 Integer.MAX_VALUE,可能堆積大量請求導致 OOM |
| CachedThreadPool / ScheduledThreadPool | 允許創建 Thread 數量為 Integer.MAX_VALUE,可能大量創建 Threads 導致 OOM |
7 大參數
ThreadPoolExecutorParams.java
new ThreadPoolExecutor(
int corePoolSize, // 常駐(核心) Thread 數量
int maximumPoolSize, // 最大 Thread 數量
long keepAliveTime, // Thread 存活時間
TimeUnit unit, // 時間單位
BlockingQueue<Runnable> workQueue, // 阻塞隊列
ThreadFactory threadFactory, // Thread 工廠
RejectedExecutionHandler handler // 拒絕策略
);| 參數 | 說明 |
|---|---|
corePoolSize | 常駐(核心)Thread 數量 |
maximumPoolSize | 最大 Thread 數量 |
keepAliveTime + unit | Thread 存活時間;超過常駐 Thread 數量,一段時間不使用,則將超過數量回收 |
workQueue | 阻塞隊列 |
threadFactory | Thread 工廠 |
handler | 拒絕策略 |
內部工作原理
拒絕策略 (Reject Strategy)
| 策略 | 說明 |
|---|---|
AbortPolicy (預設) | 拋出 RejectedExecutionException 阻止系統正常運行 |
CallerRunsPolicy | 將任務退回呼叫者執行,降低新任務的流量 |
DiscardOldestPolicy | 拋棄隊列中等待最久的任務,然後嘗試再次提交當前任務 |
DiscardPolicy | 默默地丟棄無法處理的任務,不作任何處理與拋出異常 |
常見執行緒池
ExecutorTypes.java
// 固定數量執行緒池
ExecutorService fixed = Executors.newFixedThreadPool(int nThreads);
// 單一執行緒池
ExecutorService single = Executors.newSingleThreadExecutor();
// 可擴充執行緒池
ExecutorService cached = Executors.newCachedThreadPool();| 類型 | 說明 |
|---|---|
newFixedThreadPool(n) | 1 個 Pool / N 個 Threads |
newSingleThreadExecutor() | 1 個 Pool / 1 個 Thread,任務一個接一個執行 |
newCachedThreadPool() | Thread Pool 根據需求創建 Thread,可擴充 |
執行緒池大小設定
JCIP 公式
Little’s Law
| 符號 | 說明 |
|---|---|
| L | 同時處理的請求數量 |
| λ | 長期平均到達率 (RPS) |
| W | 處理請求的平均時間(延遲) |
ForkJoinPool
Fork/Join Framework:
- Fork:把一個複雜任務進行拆分,大事化小
- Join:把分拆的任務結果進行合併