Java Executor 執行緒池
Executor 框架提供執行緒池管理,避免頻繁建立和銷毀執行緒的開銷。
引入套件
import java.util.concurrent.*;
建立執行緒池
Executors 工廠方法
// 固定大小執行緒池
ExecutorService fixed = Executors.newFixedThreadPool(4);
// 快取執行緒池(動態調整)
ExecutorService cached = Executors.newCachedThreadPool();
// 單執行緒
ExecutorService single = Executors.newSingleThreadExecutor();
// 排程執行緒池
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// 工作竊取執行緒池(Java 8+)
ExecutorService workStealing = Executors.newWorkStealingPool();
ThreadPoolExecutor 自訂
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize:核心執行緒數
4, // maximumPoolSize:最大執行緒數
60, TimeUnit.SECONDS, // keepAliveTime:閒置執行緒存活時間
new LinkedBlockingQueue<>(100), // 工作佇列
Executors.defaultThreadFactory(), // 執行緒工廠
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
提交任務
execute (Runnable)
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> {
System.out.println("任務執行中: " + Thread.currentThread().getName());
});
executor.shutdown();
submit (Callable)
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Integer> future = executor.submit(() -> {
Thread.sleep(1000);
return 42;
});
try {
Integer result = future.get(); // 等待結果
System.out.println("結果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
Future
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "完成";
});
// 檢查狀態
boolean done = future.isDone();
boolean cancelled = future.isCancelled();
// 取得結果(會阻塞)
String result = future.get();
// 設定超時
String result2 = future.get(3, TimeUnit.SECONDS);
// 取消任務
future.cancel(true); // true 表示可中斷
executor.shutdown();
invokeAll 和 invokeAny
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Callable<Integer>> tasks = Arrays.asList(
() -> { Thread.sleep(1000); return 1; },
() -> { Thread.sleep(2000); return 2; },
() -> { Thread.sleep(500); return 3; }
);
// invokeAll:等待所有任務完成
List<Future<Integer>> futures = executor.invokeAll(tasks);
for (Future<Integer> f : futures) {
System.out.println(f.get());
}
// invokeAny:回傳第一個完成的結果
Integer first = executor.invokeAny(tasks);
System.out.println("最快完成: " + first);
executor.shutdown();
關閉執行緒池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任務...
// 優雅關閉(不再接受新任務,等待執行中的任務完成)
executor.shutdown();
// 等待終止
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 超時,強制關閉
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
// 檢查是否已終止
boolean terminated = executor.isTerminated();
ScheduledExecutorService
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 延遲執行
scheduler.schedule(() -> {
System.out.println("延遲 2 秒執行");
}, 2, TimeUnit.SECONDS);
// 固定頻率(每次執行開始間隔固定)
scheduler.scheduleAtFixedRate(() -> {
System.out.println("每 3 秒執行: " + LocalTime.now());
}, 0, 3, TimeUnit.SECONDS);
// 固定延遲(每次執行結束後延遲固定時間)
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("間隔 2 秒: " + LocalTime.now());
}, 0, 2, TimeUnit.SECONDS);
// 關閉
// scheduler.shutdown();
實用範例
批次處理
public static <T> List<T> processInParallel(List<Callable<T>> tasks) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
try {
List<Future<T>> futures = executor.invokeAll(tasks);
List<T> results = new ArrayList<>();
for (Future<T> f : futures) {
try {
results.add(f.get());
} catch (ExecutionException e) {
results.add(null);
}
}
return results;
} finally {
executor.shutdown();
}
}
帶超時的任務執行
public static <T> T executeWithTimeout(Callable<T> task, long timeout, TimeUnit unit)
throws TimeoutException, ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<T> future = executor.submit(task);
try {
return future.get(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
future.cancel(true);
executor.shutdownNow();
}
}
拒絕策略
當執行緒池和佇列都滿時:
| 策略 | 說明 |
|---|---|
| AbortPolicy | 拋出異常(預設) |
| CallerRunsPolicy | 呼叫者執行緒執行 |
| DiscardPolicy | 靜默丟棄 |
| DiscardOldestPolicy | 丟棄最舊的任務 |
重點整理
Executors提供常用執行緒池的工廠方法execute()執行 Runnable,submit()執行 CallableFuture用於取得非同步結果- 使用
shutdown()優雅關閉執行緒池 ScheduledExecutorService用於排程任務- 生產環境建議使用
ThreadPoolExecutor自訂參數