Executor フレームワーク
ExecutorService
ExecutorService は Java スレッドプールの核心インターフェースで、非同期タスクの実行を管理・制御するメソッドを提供します。
execute() vs submit()
| メソッド | 戻り値 | 例外処理 |
|---|---|---|
execute(Runnable) | void | 例外を取得できない |
submit(Callable) | Future | Future 経由で例外を取得可能 |
スレッドプールの正しいシャットダウン
スレッドプールを正しく閉じるための重要なポイント:
shutdown+awaitTerminationshutdownNow+awaitTermination
| メソッド | 動作 |
|---|---|
shutdown() | 新しいタスクの送信を拒否;待機中のタスクはキャンセルされない;実行中のタスクは完了まで継続 |
shutdownNow() | 新しいタスクを拒否;待機中のタスクをキャンセル;実行中のタスクの中断を試行 |
awaitTermination() | スレッドプールの終了を待機(TERMINATED 状態まで)、タイムアウト時間パラメータあり |
ベストプラクティス
ShutdownExample.java
executorService.shutdown();
try {
if (!executorService.awaitTermination(3000, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}フローの説明:
shutdown()を呼び出し、新しいタスクの送信をブロックし、待機キュー内のタスクを完了させるawaitTermination()を呼び出し、待機キュー内のタスクが指定時間内に完了することを確保awaitTermination()の待機後もタスクが残っている場合は、shutdownNow()で強制終了awaitTermination()実行中のスレッドも中断される可能性があるため、InterruptedException を catch
ThreadPoolExecutor
Alibaba 開発規範
一般的に
Executors を使用せず、ThreadPoolExecutor でカスタム作成することで、スレッドプールの動作ルールを明確にし、リソース枯渇リスク(OOM)を回避できます。Executors が返すスレッドプールオブジェクトの欠点:
| 種類 | 問題 |
|---|---|
| FixedThreadPool / SingleThreadPool | リクエストキューの長さが Integer.MAX_VALUE まで許可され、大量のリクエストが蓄積して OOM の原因になる可能性 |
| CachedThreadPool / ScheduledThreadPool | スレッド作成数が Integer.MAX_VALUE まで許可され、大量のスレッドが作成されて OOM の原因になる可能性 |
7つのパラメータ
ThreadPoolExecutorParams.java
new ThreadPoolExecutor(
int corePoolSize, // 常駐(コア)スレッド数
int maximumPoolSize, // 最大スレッド数
long keepAliveTime, // スレッド生存時間
TimeUnit unit, // 時間単位
BlockingQueue<Runnable> workQueue, // ブロッキングキュー
ThreadFactory threadFactory, // スレッドファクトリ
RejectedExecutionHandler handler // 拒否ポリシー
);| パラメータ | 説明 |
|---|---|
corePoolSize | 常駐(コア)スレッド数 |
maximumPoolSize | 最大スレッド数 |
keepAliveTime + unit | スレッド生存時間;コアスレッド数を超えた分は、一定時間使用されないと回収 |
workQueue | ブロッキングキュー |
threadFactory | スレッドファクトリ |
handler | 拒否ポリシー |
内部動作原理
拒否ポリシー (Reject Strategy)
| ポリシー | 説明 |
|---|---|
AbortPolicy (デフォルト) | RejectedExecutionException をスローしてシステムの正常動作を阻止 |
CallerRunsPolicy | タスクを呼び出し元に戻して実行し、新しいタスクの流量を低減 |
DiscardOldestPolicy | キュー内で最も長く待機しているタスクを破棄し、現在のタスクの再送信を試行 |
DiscardPolicy | 処理できないタスクを静かに破棄、何も処理せず例外もスローしない |
一般的なスレッドプール
ExecutorTypes.java
// 固定数スレッドプール
ExecutorService fixed = Executors.newFixedThreadPool(int nThreads);
// 単一スレッドプール
ExecutorService single = Executors.newSingleThreadExecutor();
// 拡張可能スレッドプール
ExecutorService cached = Executors.newCachedThreadPool();| 種類 | 説明 |
|---|---|
newFixedThreadPool(n) | 1つのプール / N個のスレッド |
newSingleThreadExecutor() | 1つのプール / 1個のスレッド、タスクは順番に実行 |
newCachedThreadPool() | 需要に応じてスレッドを作成、拡張可能 |
スレッドプールサイズの設定
JCIP 公式
Little’s Law
| 記号 | 説明 |
|---|---|
| L | 同時に処理されるリクエスト数 |
| λ | 長期平均到着率 (RPS) |
| W | リクエスト処理の平均時間(レイテンシ) |
ForkJoinPool
Fork/Join フレームワーク:
- Fork:複雑なタスクを分割、大きな仕事を小さく
- Join:分割したタスクの結果を統合