Java Concurrent

Posted by Adam on August 24, 2022
### CompletableFuture `CompletableFuture` 是 Java 8 引入的核心類別,它是對傳統 `Future` 的重大革命。 如果把傳統的 `Future` 比喻成「一張必須你主動去櫃檯確認好了沒的取餐收據」**(你必須呼叫 `future.get()`,如果餐點沒好,你就會被**阻斷(Blocking)**在那裡原地發呆);那麼 `CompletableFuture` 就是**「一支具備自動通知與下一步呼叫功能的美食取餐呼叫器」。 它實現了 **非阻斷(Non-blocking)** 的非同步編程,允許你以宣告式(Declarative)的語法,像寫流水線一樣串聯多個非同步任務:「當 A 任務做完後,自動把結果交給 B 任務,如果中途出錯,則跳到 C 處理。」 --- ### 🌲 `CompletableFuture` 的三大核心架構樹 要徹底理解 `CompletableFuture`,必須拆解它的三大核心能力: ```text CompletableFuture 核心架構 │ ├─► 1. 任務啟動(如何把任務丟進後台?) │ ├─► runAsync() ➔ 異步執行,無回傳值 (Runnable) │ └─► supplyAsync() ➔ 異步執行,有回傳值 (Supplier) │ ├─► 2. 鏈式流水線(任務好了之後,下一步要做什麼?) │ ├─► thenApply() ➔ 轉換結果:拿到上一步的結果,加工後回傳新結果 (Function) │ ├─► thenAccept() ➔ 消費結果:拿到上一步的結果,消耗掉,不回傳 (Consumer) │ └─► thenRun() ➔ 純粹續接:不關心上一步結果,上一步做完我就接著做 (Runnable) │ └─► 3. 多工聚合(如何控制多道電文?) ├─► allOf() ➔ AND 關係:等待「所有」任務都完成,才執行下一步 └─► anyOf() ➔ OR 關係:只要「任一」任務最快完成,就直接執行下一步 ``` --- ### 🔍 深度導正:最容易踩入的底層陷阱 1. **預設執行緒池的滅頂之災(ForkJoinPool.commonPool())** * **盲點**:如果你呼叫 `supplyAsync(() -> {...})` 卻沒有指定第二個參數(`Executor`),Java 預設會使用全域共享的 `ForkJoinPool`。 * **後果**:這個預設池的執行緒數量等於你 CPU 的核心數。如果你的 4、5 道電文遭遇網路延遲(卡住 5 秒),這幾個稀少的執行緒會瞬間被佔滿,導致你整個系統內其他所有使用預設池的非同步功能(包括 Spring 的 `@Async`)集體癱瘓。 * **導正**:**商用環境下,絕對必須傳入自訂的 `ThreadPoolExecutor**`。 2. **`join()` 與 `get()` 的抉擇** * `get()` 會拋出受檢異常(Checked Exception),你必須寫痛苦的 `try-catch`。 * `join()` 只會拋出執行時期異常(RuntimeException),讓程式碼更 Clean。在 Stream 或 Lambda 運算式中,**強烈優先使用 `join()**`。 3. **例外處理必須前置(Exception Handling)** * 如果流水線中的某個任務噴了 `NullPointerException`,後續的 `thenApply` 通通不會執行,而是會一路把錯誤傳遞到最後。你必須使用 `.exceptionally()` 來為流水線裝上「安全保險絲」。 --- ### 💻 實作程式範例 ```java package com.example.demo.service; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.List; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class AsyncOrderFacade { @Autowired private Executor asyncExecutor; private String fetchUserRatingFromRemote(String userId) { return "VIP"; } private Double calculateDiscountByRating(String rating) { if ("VIP".equals(rating)) return 0.8; return 1.0; } private String fetchOrderDetailFromRemote(String orderId) { return "{\"id\":\"" + orderId + "\", \"total\": 1000}"; } public String example4allOf(String userId, String orderId) { // 1. 啟動異步任務 A:查詢使用者評等 (有回傳值) CompletableFuture<Double> safeDiscountFuture = CompletableFuture.supplyAsync(() -> { // 模擬網路查詢耗時 return fetchUserRatingFromRemote(userId); }, asyncExecutor) // 2. 當任務 A 成功拿到評等後,自動觸發「計價轉換」 // 遵循單一職責,thenApply 專職做資料轉譯 (將 String 評等轉為 Double 折扣) .thenApply(rating -> { log.info("收到用戶評等,自動觸發下一步:計算折扣邏輯"); return calculateDiscountByRating(rating); }) // 3. 幫這條流水線掛上安全防禦(保險絲模式) // 如果上述任何一步(查用戶或計價)噴錯,自動回退到基本折扣 1.0,確保系統不崩潰 .exceptionally(ex -> { log.error("非同步鏈路發生慘劇: {},自動啟動備援策略降級", ex.getMessage(), ex); return 1.0; }); // 4. 同步啟動另一個不相干的任務 C:查詢訂單明細 CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> { return fetchOrderDetailFromRemote(orderId); }, asyncExecutor); // 5. 大整合 (ALL-OF):將「安全的折扣流水線」與「訂單明細流水線」併發轟炸 // 這裡是非阻斷的,兩條執行緒在後台同時奔跑 CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(safeDiscountFuture, orderFuture); // 6. 最終站:主執行緒在此處阻斷,等待兩條後台線程都收工回來 combinedFuture.join(); try { // 7. 輕巧地從呼叫器(Future)中直接取出已經就緒的熟食(資料) Double finalDiscount = safeDiscountFuture.get(); String orderDetail = orderFuture.get(); // 回傳聚合後的標準筆記格式 return String.format("{\"order\": %s, \"appliedDiscount\": %.2f}", orderDetail, finalDiscount); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("最終資料聚合失敗", e); } } @Data @AllArgsConstructor class FlightPrice { private String source; private int price; } private FlightPrice fetchFromVendorApi(String flightNo) { try { Thread.sleep(800); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return new FlightPrice("VENDOR_API", 5200); } private FlightPrice fetchFromDatabase(String flightNo) { try { Thread.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return new FlightPrice("LOCAL_DB", 5500); } private FlightPrice fetchFromRedisCache(String flightNo) { try { Thread.sleep(40); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return new FlightPrice("REDIS_CACHE", 5000); } public FlightPrice example4anyOf(String flightNo) { // 1. 啟動非同步任務 A:向官網 API 查詢價格 (耗時可能較長) CompletableFuture<FlightPrice> apiFuture = CompletableFuture.supplyAsync(() -> { return fetchFromVendorApi(flightNo); }, asyncExecutor); // 2. 啟動非同步任務 B:向內部歷史資料庫查詢 (耗時中等) CompletableFuture<FlightPrice> dbFuture = CompletableFuture.supplyAsync(() -> { return fetchFromDatabase(flightNo); }, asyncExecutor); // 3. 啟動非同步任務 C:向 Redis 分散式快取查詢 (理論上最快) CompletableFuture<FlightPrice> cacheFuture = CompletableFuture.supplyAsync(() -> { return fetchFromRedisCache(flightNo); }, asyncExecutor); // 4. 使用 anyOf 讓 3 個管道共同進行速度賽跑 // 傳入陣列,注意:此處回傳的泛型退化成了 Object CompletableFuture<FlightPrice> safeFuture = CompletableFuture.anyOf(apiFuture, dbFuture, cacheFuture) // 5. 鏈式流水線:處理最快回傳的結果 (thenApply) .thenApply(rawResult -> { // 遵循 Clean Code,進行安全的型態檢查與強制轉換 (Cast) if (rawResult instanceof FlightPrice) { FlightPrice fastestPrice = (FlightPrice) rawResult; System.out.println("🏁 anyOf 賽跑結束!最快渠道來源為: " + fastestPrice.getSource()); return fastestPrice; } throw new IllegalStateException("未知的回傳資料型態"); }) // 6. 掛上安全保險絲,防禦最快那個渠道噴錯 (例如 Redis 突然掛掉) .exceptionally(ex -> { System.err.println("最快渠道發生異常,自動降級啟用全域保底價"); return new FlightPrice("FALLBACK", 9999); }); // 7. 主執行緒在此處阻斷,等待最快的第一名收工 return safeFuture.join(); } } ``` ```java package com.example.demo.config; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration public class AsyncThreadPoolConfig { @Bean(name = "asyncExecutor") public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 1. 核心執行緒數:配合 4 核或 8 核伺服器,I/O 密集型給予適度放大 int cpuCores = Runtime.getRuntime().availableProcessors(); executor.setCorePoolSize(cpuCores * 2); // 2. 緩衝佇列:極其重要!必須寫死上限,絕對防禦 OOM 記憶體洩漏 executor.setQueueCapacity(500); // 3. 最大執行緒數:當佇列爆滿時,允許臨時擴充到最大戰力 executor.setMaxPoolSize(cpuCores * 4); // 4. 執行緒空閒存活時間:超過核心數的臨時執行緒,發呆超過 60 秒就銷毀 executor.setKeepAliveSeconds(60); // 5. 執行緒名稱前綴:日誌排查與 Thread Dump 時的救命稻草 executor.setThreadNamePrefix("AsyncKit-Task-"); // 6. 核心精髓:飽和拒絕策略 (RejectedExecutionHandler) // 使用 CallerRunsPolicy:當池子與佇列通通爆滿時,讓「主執行緒」自己去執行這個任務。 // 好處:1. 絕對不丟失任何一筆任務 2. 讓主執行緒慢下來,對前端產生天然的「限流/背壓 (Backpressure)」效果。 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 7. 優雅關閉設定:當 Spring Boot 關閉時,等待未完成的任務跑完再熄燈 executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(30); // 最多等 30 秒,超過就強行關閉 executor.initialize(); return executor; } } ```