Java 平行 Stream

平行 Stream 利用多核心處理器並行處理資料,可以提升大量資料的處理效能。

建立平行 Stream

// 從集合建立
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> parallelStream = numbers.parallelStream();

// 從一般 Stream 轉換
Stream<Integer> parallelStream = numbers.stream().parallel();

// 檢查是否為平行
boolean isParallel = stream.isParallel();

// 轉回順序 Stream
Stream<Integer> sequential = parallelStream.sequential();

基本用法

// 平行處理
List<String> result = names.parallelStream()
    .filter(name -> name.length() > 3)
    .map(String::toUpperCase)
    .collect(Collectors.toList());

// 平行求和
int sum = numbers.parallelStream()
    .mapToInt(Integer::intValue)
    .sum();

效能比較

public static void main(String[] args) {
    List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
        .boxed()
        .collect(Collectors.toList());

    // 順序處理
    long start = System.currentTimeMillis();
    long sum1 = numbers.stream()
        .mapToLong(Integer::longValue)
        .sum();
    System.out.println("順序: " + (System.currentTimeMillis() - start) + "ms");

    // 平行處理
    start = System.currentTimeMillis();
    long sum2 = numbers.parallelStream()
        .mapToLong(Integer::longValue)
        .sum();
    System.out.println("平行: " + (System.currentTimeMillis() - start) + "ms");
}

什麼時候使用平行 Stream

適合使用

  • 資料量大(通常 > 10000 個元素)
  • 運算密集(每個元素處理時間長)
  • 資料來源支援有效分割(ArrayList、陣列)
  • 操作是獨立的,沒有共享狀態
// ✓ 大量資料,獨立運算
long count = largeList.parallelStream()
    .filter(this::complexFilter)
    .count();

// ✓ CPU 密集型運算
List<Result> results = data.parallelStream()
    .map(this::heavyComputation)
    .collect(Collectors.toList());

不適合使用

  • 資料量小
  • I/O 密集型操作
  • 操作有順序依賴
  • 資料來源不支援有效分割(LinkedList)
  • 有共享可變狀態
// ✗ 資料量小
List<Integer> small = Arrays.asList(1, 2, 3);
small.parallelStream()...  // 開銷大於收益

// ✗ 順序敏感操作
list.parallelStream()
    .forEachOrdered(System.out::println);  // 失去平行優勢

// ✗ 共享可變狀態
List<Integer> result = new ArrayList<>();  // 不安全!
numbers.parallelStream()
    .forEach(result::add);  // 會有並發問題

順序保證

forEachOrdered

// forEach 不保證順序
numbers.parallelStream()
    .forEach(System.out::print);  // 順序不定

// forEachOrdered 保證順序
numbers.parallelStream()
    .forEachOrdered(System.out::print);  // 保持原始順序

收集保持順序

// toList() 保持順序
List<Integer> result = numbers.parallelStream()
    .filter(n -> n > 0)
    .collect(Collectors.toList());  // 順序正確

避免的陷阱

共享可變狀態

// ✗ 錯誤:共享可變狀態
List<Integer> result = new ArrayList<>();
numbers.parallelStream()
    .forEach(result::add);  // ConcurrentModificationException 或資料遺失

// ✓ 正確:使用 collect
List<Integer> result = numbers.parallelStream()
    .collect(Collectors.toList());

// ✓ 或使用執行緒安全集合
List<Integer> result = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream()
    .forEach(result::add);  // 但效能較差

副作用

// ✗ 避免副作用
AtomicInteger count = new AtomicInteger(0);
numbers.parallelStream()
    .peek(n -> count.incrementAndGet())  // 不好
    .forEach(n -> {});

// ✓ 使用適當的操作
long count = numbers.parallelStream()
    .filter(n -> n > 0)
    .count();

阻塞操作

// ✗ 避免阻塞操作
urls.parallelStream()
    .map(url -> {
        Thread.sleep(1000);  // 會阻塞 ForkJoinPool
        return fetchUrl(url);
    });

// ✓ 使用 CompletableFuture
List<CompletableFuture<String>> futures = urls.stream()
    .map(url -> CompletableFuture.supplyAsync(() -> fetchUrl(url)))
    .collect(Collectors.toList());

List<String> results = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

ForkJoinPool

平行 Stream 預設使用共用的 ForkJoinPool:

// 取得平行度
int parallelism = ForkJoinPool.commonPool().getParallelism();
// 預設為 CPU 核心數 - 1

// 設定平行度(JVM 參數)
// -Djava.util.concurrent.ForkJoinPool.common.parallelism=4

使用自訂 ForkJoinPool

// 建立自訂 ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);

try {
    List<Integer> result = customPool.submit(() ->
        numbers.parallelStream()
            .filter(n -> n > 0)
            .collect(Collectors.toList())
    ).get();
} finally {
    customPool.shutdown();
}

效能調校

選擇合適的資料結構

資料來源分割效率建議
ArrayList優秀推薦
陣列優秀推薦
IntStream.range優秀推薦
HashSet良好可用
TreeSet良好可用
LinkedList避免
Stream.iterate避免

減少裝箱

// ✗ 裝箱開銷
long sum = numbers.parallelStream()
    .mapToLong(Integer::longValue)
    .sum();

// ✓ 使用基本型別 Stream
long sum = IntStream.rangeClosed(1, 1000000)
    .parallel()
    .sum();

操作特性

// 有狀態操作(如 sorted、distinct)可能降低效能
list.parallelStream()
    .sorted()  // 需要收集所有元素
    .forEach(...);

// 無狀態操作效能較好
list.parallelStream()
    .filter(...)
    .map(...)
    .forEach(...);

實際範例

大量資料處理

public List<Result> processBatch(List<Data> data) {
    return data.parallelStream()
        .filter(this::isValid)
        .map(this::transform)
        .collect(Collectors.toList());
}

圖片處理

public void processImages(List<Path> imagePaths) {
    imagePaths.parallelStream()
        .forEach(path -> {
            BufferedImage image = ImageIO.read(path.toFile());
            BufferedImage processed = applyFilter(image);
            ImageIO.write(processed, "png", new File("output/" + path.getFileName()));
        });
}

統計計算

public DoubleSummaryStatistics calculateStats(List<Double> values) {
    return values.parallelStream()
        .mapToDouble(Double::doubleValue)
        .summaryStatistics();
}

重點整理

情況建議
資料量小使用順序 Stream
資料量大 + CPU 密集使用平行 Stream
I/O 密集使用 CompletableFuture
需要順序使用 forEachOrdered
有共享狀態使用順序 Stream 或 collect
  • 平行不一定更快,需要測試
  • 避免共享可變狀態
  • 優先使用 ArrayList 或陣列
  • 使用基本型別 Stream 避免裝箱
  • 注意操作的順序性和狀態性
  • 不要在共用 ForkJoinPool 中阻塞