Java Callable 與 Future
Callable 是可以回傳結果和拋出例外的任務介面,Future 用於取得非同步執行的結果。
Callable vs Runnable
// Runnable:無回傳值,不能拋出受檢例外
Runnable runnable = () -> {
System.out.println("執行中");
};
// Callable:有回傳值,可以拋出例外
Callable<Integer> callable = () -> {
Thread.sleep(1000);
return 42;
};
使用 Callable
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<String> task = () -> {
Thread.sleep(2000);
return "任務完成";
};
Future<String> future = executor.submit(task);
// 做其他事情...
// 取得結果(會阻塞)
String result = future.get();
System.out.println(result); // 任務完成
executor.shutdown();
Future 介面
Future<Integer> future = executor.submit(() -> {
Thread.sleep(3000);
return 100;
});
// 檢查是否完成
boolean done = future.isDone();
// 檢查是否被取消
boolean cancelled = future.isCancelled();
// 取得結果(阻塞等待)
Integer result = future.get();
// 取得結果(帶超時)
try {
Integer result2 = future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("超時!");
}
// 取消任務
boolean cancelSuccess = future.cancel(true); // true = 可中斷執行中的任務
FutureTask
FutureTask 同時實作了 Runnable 和 Future:
// 包裝 Callable
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000);
return 42;
});
// 可以直接用 Thread 執行
new Thread(futureTask).start();
// 也可以用 ExecutorService
// executor.submit(futureTask);
// 取得結果
Integer result = futureTask.get();
System.out.println(result); // 42
多個 Future
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Future<Integer>> futures = new ArrayList<>();
// 提交多個任務
for (int i = 0; i < 5; i++) {
final int num = i;
futures.add(executor.submit(() -> {
Thread.sleep(1000);
return num * 10;
}));
}
// 收集結果
List<Integer> results = new ArrayList<>();
for (Future<Integer> f : futures) {
results.add(f.get()); // 依序等待
}
System.out.println(results); // [0, 10, 20, 30, 40]
executor.shutdown();
例外處理
Future<Integer> future = executor.submit(() -> {
throw new RuntimeException("出錯了!");
});
try {
Integer result = future.get();
} catch (ExecutionException e) {
// 取得原始例外
Throwable cause = e.getCause();
System.out.println("錯誤: " + cause.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
CompletableFuture 對比
CompletableFuture 提供更強大的功能:
// Future:需要阻塞等待
Future<String> future = executor.submit(() -> "結果");
String result = future.get(); // 阻塞
// CompletableFuture:支援回調
CompletableFuture.supplyAsync(() -> "結果")
.thenAccept(r -> System.out.println(r)) // 非阻塞
.join();
實用範例
並行計算
public static int parallelSum(List<Integer> numbers) throws Exception {
int numThreads = 4;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
int chunkSize = numbers.size() / numThreads;
List<Callable<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
int start = i * chunkSize;
int end = (i == numThreads - 1) ? numbers.size() : (i + 1) * chunkSize;
List<Integer> chunk = numbers.subList(start, end);
tasks.add(() -> chunk.stream().mapToInt(Integer::intValue).sum());
}
int total = 0;
for (Future<Integer> f : executor.invokeAll(tasks)) {
total += f.get();
}
executor.shutdown();
return total;
}
帶超時的 API 呼叫
public static <T> T callWithTimeout(Callable<T> callable, int timeoutSeconds) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<T> future = executor.submit(callable);
try {
return future.get(timeoutSeconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new RuntimeException("操作超時");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
executor.shutdownNow();
}
}
// 使用
String result = callWithTimeout(() -> {
// 模擬 API 呼叫
Thread.sleep(500);
return "API 回應";
}, 2);
重點整理
Callable<V>可以回傳結果 V,可以拋出例外Future<V>代表非同步計算的結果get()會阻塞直到結果可用get(timeout, unit)設定等待超時cancel(true)可以取消並中斷執行中的任務FutureTask同時實作Runnable和Future- 需要更強大功能時使用
CompletableFuture