Java CompletableFuture

CompletableFuture 是 Java 8 引入的強大非同步程式設計工具,支援鏈式操作、組合多個非同步任務、錯誤處理等功能。

引入套件

import java.util.concurrent.CompletableFuture;

建立 CompletableFuture

// 使用 Supplier(有回傳值)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "Hello";
});

// 使用 Runnable(無回傳值)
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    System.out.println("執行中");
});

// 使用指定的執行緒池
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "結果", executor);

// 直接建立已完成的 Future
CompletableFuture<String> completed = CompletableFuture.completedFuture("完成");

取得結果

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

// get():阻塞等待,會拋出受檢例外
String result1 = future.get();

// get() 帶超時
String result2 = future.get(1, TimeUnit.SECONDS);

// join():阻塞等待,拋出非受檢例外
String result3 = future.join();

// getNow():立即回傳,未完成則回傳預設值
String result4 = future.getNow("預設值");

轉換結果(thenApply)

CompletableFuture<Integer> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApply(s -> s.length())
    .thenApply(len -> len * 2);

System.out.println(future.join());  // 10

消費結果(thenAccept)

CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(s -> System.out.println("結果: " + s));

執行後續任務(thenRun)

CompletableFuture.supplyAsync(() -> "Hello")
    .thenRun(() -> System.out.println("任務完成"));

組合兩個 Future

thenCompose(扁平化)

// 第二個任務依賴第一個的結果
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

System.out.println(future.join());  // Hello World

thenCombine(合併兩個獨立任務)

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.join());  // Hello World

多個 Future 操作

allOf(等待全部完成)

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);

// 等待全部完成後收集結果
all.thenRun(() -> {
    System.out.println(f1.join() + f2.join() + f3.join());  // ABC
}).join();

anyOf(任一完成即回傳)

CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
System.out.println(any.join());  // 最先完成的結果

錯誤處理

exceptionally

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (true) throw new RuntimeException("錯誤!");
        return "結果";
    })
    .exceptionally(ex -> "預設值: " + ex.getMessage());

System.out.println(future.join());  // 預設值: 錯誤!

handle(處理結果或例外)

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() > 0.5) throw new RuntimeException("失敗");
        return "成功";
    })
    .handle((result, ex) -> {
        if (ex != null) return "處理錯誤: " + ex.getMessage();
        return "處理結果: " + result;
    });

whenComplete(不改變結果)

CompletableFuture.supplyAsync(() -> "Hello")
    .whenComplete((result, ex) -> {
        if (ex != null) {
            System.out.println("發生錯誤");
        } else {
            System.out.println("結果: " + result);
        }
    });

Async 版本方法

預設在同一執行緒執行回調,加上 Async 後綴則使用另一執行緒:

CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s.toUpperCase())        // 可能在同一執行緒
    .thenApplyAsync(s -> s + "!")           // 在另一執行緒
    .thenApplyAsync(s -> s + "?", executor) // 在指定執行緒池
    .join();

實用範例

並行呼叫多個 API

public CompletableFuture<UserData> getUserData(String userId) {
    CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> getUser(userId));
    CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(() -> getOrders(userId));
    CompletableFuture<Settings> settingsFuture = CompletableFuture.supplyAsync(() -> getSettings(userId));

    return CompletableFuture.allOf(userFuture, ordersFuture, settingsFuture)
        .thenApply(v -> new UserData(
            userFuture.join(),
            ordersFuture.join(),
            settingsFuture.join()
        ));
}

帶超時的非同步操作

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        try { Thread.sleep(5000); } catch (Exception e) {}
        return "結果";
    })
    .orTimeout(2, TimeUnit.SECONDS)  // Java 9+
    .exceptionally(ex -> "超時預設值");

帶重試的非同步操作

public static <T> CompletableFuture<T> retry(Supplier<T> task, int maxRetries) {
    CompletableFuture<T> future = CompletableFuture.supplyAsync(task);

    for (int i = 0; i < maxRetries; i++) {
        future = future.exceptionally(ex -> task.get());
    }
    return future;
}

重點整理

  • supplyAsync 執行有回傳值的任務
  • thenApply 轉換結果,thenAccept 消費結果
  • thenCompose 扁平化串接,thenCombine 合併兩個任務
  • allOf 等待全部,anyOf 等待任一
  • exceptionallyhandle 處理錯誤
  • Async 後綴版本在另一執行緒執行回調
  • Future 更強大,支援鏈式操作和組合