Java 平行 Stream
平行 Stream 利用多核心處理器並行處理資料,可以提升大量資料的處理效能。
建立平行 Stream
// 從集合建立
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> parallelStream = numbers.parallelStream();
// 從一般 Stream 轉換
Stream<Integer> parallelStream = numbers.stream().parallel();
// 檢查是否為平行
boolean isParallel = stream.isParallel();
// 轉回順序 Stream
Stream<Integer> sequential = parallelStream.sequential();
基本用法
// 平行處理
List<String> result = names.parallelStream()
.filter(name -> name.length() > 3)
.map(String::toUpperCase)
.collect(Collectors.toList());
// 平行求和
int sum = numbers.parallelStream()
.mapToInt(Integer::intValue)
.sum();
效能比較
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());
// 順序處理
long start = System.currentTimeMillis();
long sum1 = numbers.stream()
.mapToLong(Integer::longValue)
.sum();
System.out.println("順序: " + (System.currentTimeMillis() - start) + "ms");
// 平行處理
start = System.currentTimeMillis();
long sum2 = numbers.parallelStream()
.mapToLong(Integer::longValue)
.sum();
System.out.println("平行: " + (System.currentTimeMillis() - start) + "ms");
}
什麼時候使用平行 Stream
適合使用
- 資料量大(通常 > 10000 個元素)
- 運算密集(每個元素處理時間長)
- 資料來源支援有效分割(ArrayList、陣列)
- 操作是獨立的,沒有共享狀態
// ✓ 大量資料,獨立運算
long count = largeList.parallelStream()
.filter(this::complexFilter)
.count();
// ✓ CPU 密集型運算
List<Result> results = data.parallelStream()
.map(this::heavyComputation)
.collect(Collectors.toList());
不適合使用
- 資料量小
- I/O 密集型操作
- 操作有順序依賴
- 資料來源不支援有效分割(LinkedList)
- 有共享可變狀態
// ✗ 資料量小
List<Integer> small = Arrays.asList(1, 2, 3);
small.parallelStream()... // 開銷大於收益
// ✗ 順序敏感操作
list.parallelStream()
.forEachOrdered(System.out::println); // 失去平行優勢
// ✗ 共享可變狀態
List<Integer> result = new ArrayList<>(); // 不安全!
numbers.parallelStream()
.forEach(result::add); // 會有並發問題
順序保證
forEachOrdered
// forEach 不保證順序
numbers.parallelStream()
.forEach(System.out::print); // 順序不定
// forEachOrdered 保證順序
numbers.parallelStream()
.forEachOrdered(System.out::print); // 保持原始順序
收集保持順序
// toList() 保持順序
List<Integer> result = numbers.parallelStream()
.filter(n -> n > 0)
.collect(Collectors.toList()); // 順序正確
避免的陷阱
共享可變狀態
// ✗ 錯誤:共享可變狀態
List<Integer> result = new ArrayList<>();
numbers.parallelStream()
.forEach(result::add); // ConcurrentModificationException 或資料遺失
// ✓ 正確:使用 collect
List<Integer> result = numbers.parallelStream()
.collect(Collectors.toList());
// ✓ 或使用執行緒安全集合
List<Integer> result = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream()
.forEach(result::add); // 但效能較差
副作用
// ✗ 避免副作用
AtomicInteger count = new AtomicInteger(0);
numbers.parallelStream()
.peek(n -> count.incrementAndGet()) // 不好
.forEach(n -> {});
// ✓ 使用適當的操作
long count = numbers.parallelStream()
.filter(n -> n > 0)
.count();
阻塞操作
// ✗ 避免阻塞操作
urls.parallelStream()
.map(url -> {
Thread.sleep(1000); // 會阻塞 ForkJoinPool
return fetchUrl(url);
});
// ✓ 使用 CompletableFuture
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> fetchUrl(url)))
.collect(Collectors.toList());
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
ForkJoinPool
平行 Stream 預設使用共用的 ForkJoinPool:
// 取得平行度
int parallelism = ForkJoinPool.commonPool().getParallelism();
// 預設為 CPU 核心數 - 1
// 設定平行度(JVM 參數)
// -Djava.util.concurrent.ForkJoinPool.common.parallelism=4
使用自訂 ForkJoinPool
// 建立自訂 ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);
try {
List<Integer> result = customPool.submit(() ->
numbers.parallelStream()
.filter(n -> n > 0)
.collect(Collectors.toList())
).get();
} finally {
customPool.shutdown();
}
效能調校
選擇合適的資料結構
| 資料來源 | 分割效率 | 建議 |
|---|---|---|
| ArrayList | 優秀 | 推薦 |
| 陣列 | 優秀 | 推薦 |
| IntStream.range | 優秀 | 推薦 |
| HashSet | 良好 | 可用 |
| TreeSet | 良好 | 可用 |
| LinkedList | 差 | 避免 |
| Stream.iterate | 差 | 避免 |
減少裝箱
// ✗ 裝箱開銷
long sum = numbers.parallelStream()
.mapToLong(Integer::longValue)
.sum();
// ✓ 使用基本型別 Stream
long sum = IntStream.rangeClosed(1, 1000000)
.parallel()
.sum();
操作特性
// 有狀態操作(如 sorted、distinct)可能降低效能
list.parallelStream()
.sorted() // 需要收集所有元素
.forEach(...);
// 無狀態操作效能較好
list.parallelStream()
.filter(...)
.map(...)
.forEach(...);
實際範例
大量資料處理
public List<Result> processBatch(List<Data> data) {
return data.parallelStream()
.filter(this::isValid)
.map(this::transform)
.collect(Collectors.toList());
}
圖片處理
public void processImages(List<Path> imagePaths) {
imagePaths.parallelStream()
.forEach(path -> {
BufferedImage image = ImageIO.read(path.toFile());
BufferedImage processed = applyFilter(image);
ImageIO.write(processed, "png", new File("output/" + path.getFileName()));
});
}
統計計算
public DoubleSummaryStatistics calculateStats(List<Double> values) {
return values.parallelStream()
.mapToDouble(Double::doubleValue)
.summaryStatistics();
}
重點整理
| 情況 | 建議 |
|---|---|
| 資料量小 | 使用順序 Stream |
| 資料量大 + CPU 密集 | 使用平行 Stream |
| I/O 密集 | 使用 CompletableFuture |
| 需要順序 | 使用 forEachOrdered |
| 有共享狀態 | 使用順序 Stream 或 collect |
- 平行不一定更快,需要測試
- 避免共享可變狀態
- 優先使用 ArrayList 或陣列
- 使用基本型別 Stream 避免裝箱
- 注意操作的順序性和狀態性
- 不要在共用 ForkJoinPool 中阻塞