Java Executor 執行緒池

Executor 框架提供執行緒池管理,避免頻繁建立和銷毀執行緒的開銷。

引入套件

import java.util.concurrent.*;

建立執行緒池

Executors 工廠方法

// 固定大小執行緒池
ExecutorService fixed = Executors.newFixedThreadPool(4);

// 快取執行緒池(動態調整)
ExecutorService cached = Executors.newCachedThreadPool();

// 單執行緒
ExecutorService single = Executors.newSingleThreadExecutor();

// 排程執行緒池
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);

// 工作竊取執行緒池(Java 8+)
ExecutorService workStealing = Executors.newWorkStealingPool();

ThreadPoolExecutor 自訂

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2,                      // corePoolSize:核心執行緒數
    4,                      // maximumPoolSize:最大執行緒數
    60, TimeUnit.SECONDS,   // keepAliveTime:閒置執行緒存活時間
    new LinkedBlockingQueue<>(100),  // 工作佇列
    Executors.defaultThreadFactory(), // 執行緒工廠
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒絕策略
);

提交任務

execute (Runnable)

ExecutorService executor = Executors.newFixedThreadPool(2);

executor.execute(() -> {
    System.out.println("任務執行中: " + Thread.currentThread().getName());
});

executor.shutdown();

submit (Callable)

ExecutorService executor = Executors.newFixedThreadPool(2);

Future<Integer> future = executor.submit(() -> {
    Thread.sleep(1000);
    return 42;
});

try {
    Integer result = future.get();  // 等待結果
    System.out.println("結果: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

executor.shutdown();

Future

ExecutorService executor = Executors.newSingleThreadExecutor();

Future<String> future = executor.submit(() -> {
    Thread.sleep(2000);
    return "完成";
});

// 檢查狀態
boolean done = future.isDone();
boolean cancelled = future.isCancelled();

// 取得結果(會阻塞)
String result = future.get();

// 設定超時
String result2 = future.get(3, TimeUnit.SECONDS);

// 取消任務
future.cancel(true);  // true 表示可中斷

executor.shutdown();

invokeAll 和 invokeAny

ExecutorService executor = Executors.newFixedThreadPool(3);

List<Callable<Integer>> tasks = Arrays.asList(
    () -> { Thread.sleep(1000); return 1; },
    () -> { Thread.sleep(2000); return 2; },
    () -> { Thread.sleep(500); return 3; }
);

// invokeAll:等待所有任務完成
List<Future<Integer>> futures = executor.invokeAll(tasks);
for (Future<Integer> f : futures) {
    System.out.println(f.get());
}

// invokeAny:回傳第一個完成的結果
Integer first = executor.invokeAny(tasks);
System.out.println("最快完成: " + first);

executor.shutdown();

關閉執行緒池

ExecutorService executor = Executors.newFixedThreadPool(2);

// 提交任務...

// 優雅關閉(不再接受新任務,等待執行中的任務完成)
executor.shutdown();

// 等待終止
try {
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        // 超時,強制關閉
        executor.shutdownNow();
    }
} catch (InterruptedException e) {
    executor.shutdownNow();
}

// 檢查是否已終止
boolean terminated = executor.isTerminated();

ScheduledExecutorService

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// 延遲執行
scheduler.schedule(() -> {
    System.out.println("延遲 2 秒執行");
}, 2, TimeUnit.SECONDS);

// 固定頻率(每次執行開始間隔固定)
scheduler.scheduleAtFixedRate(() -> {
    System.out.println("每 3 秒執行: " + LocalTime.now());
}, 0, 3, TimeUnit.SECONDS);

// 固定延遲(每次執行結束後延遲固定時間)
scheduler.scheduleWithFixedDelay(() -> {
    System.out.println("間隔 2 秒: " + LocalTime.now());
}, 0, 2, TimeUnit.SECONDS);

// 關閉
// scheduler.shutdown();

實用範例

批次處理

public static <T> List<T> processInParallel(List<Callable<T>> tasks) throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors()
    );

    try {
        List<Future<T>> futures = executor.invokeAll(tasks);
        List<T> results = new ArrayList<>();
        for (Future<T> f : futures) {
            try {
                results.add(f.get());
            } catch (ExecutionException e) {
                results.add(null);
            }
        }
        return results;
    } finally {
        executor.shutdown();
    }
}

帶超時的任務執行

public static <T> T executeWithTimeout(Callable<T> task, long timeout, TimeUnit unit)
        throws TimeoutException, ExecutionException {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<T> future = executor.submit(task);

    try {
        return future.get(timeout, unit);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(e);
    } finally {
        future.cancel(true);
        executor.shutdownNow();
    }
}

拒絕策略

當執行緒池和佇列都滿時:

策略說明
AbortPolicy拋出異常(預設)
CallerRunsPolicy呼叫者執行緒執行
DiscardPolicy靜默丟棄
DiscardOldestPolicy丟棄最舊的任務

重點整理

  • Executors 提供常用執行緒池的工廠方法
  • execute() 執行 Runnable,submit() 執行 Callable
  • Future 用於取得非同步結果
  • 使用 shutdown() 優雅關閉執行緒池
  • ScheduledExecutorService 用於排程任務
  • 生產環境建議使用 ThreadPoolExecutor 自訂參數