Java CompletableFuture
CompletableFuture 是 Java 8 引入的強大非同步程式設計工具,支援鏈式操作、組合多個非同步任務、錯誤處理等功能。
引入套件
import java.util.concurrent.CompletableFuture;
建立 CompletableFuture
// 使用 Supplier(有回傳值)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 使用 Runnable(無回傳值)
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("執行中");
});
// 使用指定的執行緒池
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "結果", executor);
// 直接建立已完成的 Future
CompletableFuture<String> completed = CompletableFuture.completedFuture("完成");
取得結果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
// get():阻塞等待,會拋出受檢例外
String result1 = future.get();
// get() 帶超時
String result2 = future.get(1, TimeUnit.SECONDS);
// join():阻塞等待,拋出非受檢例外
String result3 = future.join();
// getNow():立即回傳,未完成則回傳預設值
String result4 = future.getNow("預設值");
轉換結果(thenApply)
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s.length())
.thenApply(len -> len * 2);
System.out.println(future.join()); // 10
消費結果(thenAccept)
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println("結果: " + s));
執行後續任務(thenRun)
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("任務完成"));
組合兩個 Future
thenCompose(扁平化)
// 第二個任務依賴第一個的結果
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
System.out.println(future.join()); // Hello World
thenCombine(合併兩個獨立任務)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.join()); // Hello World
多個 Future 操作
allOf(等待全部完成)
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
// 等待全部完成後收集結果
all.thenRun(() -> {
System.out.println(f1.join() + f2.join() + f3.join()); // ABC
}).join();
anyOf(任一完成即回傳)
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
System.out.println(any.join()); // 最先完成的結果
錯誤處理
exceptionally
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("錯誤!");
return "結果";
})
.exceptionally(ex -> "預設值: " + ex.getMessage());
System.out.println(future.join()); // 預設值: 錯誤!
handle(處理結果或例外)
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("失敗");
return "成功";
})
.handle((result, ex) -> {
if (ex != null) return "處理錯誤: " + ex.getMessage();
return "處理結果: " + result;
});
whenComplete(不改變結果)
CompletableFuture.supplyAsync(() -> "Hello")
.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("發生錯誤");
} else {
System.out.println("結果: " + result);
}
});
Async 版本方法
預設在同一執行緒執行回調,加上 Async 後綴則使用另一執行緒:
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s.toUpperCase()) // 可能在同一執行緒
.thenApplyAsync(s -> s + "!") // 在另一執行緒
.thenApplyAsync(s -> s + "?", executor) // 在指定執行緒池
.join();
實用範例
並行呼叫多個 API
public CompletableFuture<UserData> getUserData(String userId) {
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> getUser(userId));
CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(() -> getOrders(userId));
CompletableFuture<Settings> settingsFuture = CompletableFuture.supplyAsync(() -> getSettings(userId));
return CompletableFuture.allOf(userFuture, ordersFuture, settingsFuture)
.thenApply(v -> new UserData(
userFuture.join(),
ordersFuture.join(),
settingsFuture.join()
));
}
帶超時的非同步操作
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
try { Thread.sleep(5000); } catch (Exception e) {}
return "結果";
})
.orTimeout(2, TimeUnit.SECONDS) // Java 9+
.exceptionally(ex -> "超時預設值");
帶重試的非同步操作
public static <T> CompletableFuture<T> retry(Supplier<T> task, int maxRetries) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(task);
for (int i = 0; i < maxRetries; i++) {
future = future.exceptionally(ex -> task.get());
}
return future;
}
重點整理
supplyAsync執行有回傳值的任務thenApply轉換結果,thenAccept消費結果thenCompose扁平化串接,thenCombine合併兩個任務allOf等待全部,anyOf等待任一exceptionally和handle處理錯誤Async後綴版本在另一執行緒執行回調- 比
Future更強大,支援鏈式操作和組合