### CompletableFuture
`CompletableFuture` 是 Java 8 引入的核心類別,它是對傳統 `Future` 的重大革命。
如果把傳統的 `Future` 比喻成「一張必須你主動去櫃檯確認好了沒的取餐收據」**(你必須呼叫 `future.get()`,如果餐點沒好,你就會被**阻斷(Blocking)**在那裡原地發呆);那麼 `CompletableFuture` 就是**「一支具備自動通知與下一步呼叫功能的美食取餐呼叫器」。
它實現了 **非阻斷(Non-blocking)** 的非同步編程,允許你以宣告式(Declarative)的語法,像寫流水線一樣串聯多個非同步任務:「當 A 任務做完後,自動把結果交給 B 任務,如果中途出錯,則跳到 C 處理。」
---
### 🌲 `CompletableFuture` 的三大核心架構樹
要徹底理解 `CompletableFuture`,必須拆解它的三大核心能力:
```text
CompletableFuture 核心架構
│
├─► 1. 任務啟動(如何把任務丟進後台?)
│ ├─► runAsync() ➔ 異步執行,無回傳值 (Runnable)
│ └─► supplyAsync() ➔ 異步執行,有回傳值 (Supplier)
│
├─► 2. 鏈式流水線(任務好了之後,下一步要做什麼?)
│ ├─► thenApply() ➔ 轉換結果:拿到上一步的結果,加工後回傳新結果 (Function)
│ ├─► thenAccept() ➔ 消費結果:拿到上一步的結果,消耗掉,不回傳 (Consumer)
│ └─► thenRun() ➔ 純粹續接:不關心上一步結果,上一步做完我就接著做 (Runnable)
│
└─► 3. 多工聚合(如何控制多道電文?)
├─► allOf() ➔ AND 關係:等待「所有」任務都完成,才執行下一步
└─► anyOf() ➔ OR 關係:只要「任一」任務最快完成,就直接執行下一步
```
---
### 🔍 深度導正:最容易踩入的底層陷阱
1. **預設執行緒池的滅頂之災(ForkJoinPool.commonPool())**
* **盲點**:如果你呼叫 `supplyAsync(() -> {...})` 卻沒有指定第二個參數(`Executor`),Java 預設會使用全域共享的 `ForkJoinPool`。
* **後果**:這個預設池的執行緒數量等於你 CPU 的核心數。如果你的 4、5 道電文遭遇網路延遲(卡住 5 秒),這幾個稀少的執行緒會瞬間被佔滿,導致你整個系統內其他所有使用預設池的非同步功能(包括 Spring 的 `@Async`)集體癱瘓。
* **導正**:**商用環境下,絕對必須傳入自訂的 `ThreadPoolExecutor**`。
2. **`join()` 與 `get()` 的抉擇**
* `get()` 會拋出受檢異常(Checked Exception),你必須寫痛苦的 `try-catch`。
* `join()` 只會拋出執行時期異常(RuntimeException),讓程式碼更 Clean。在 Stream 或 Lambda 運算式中,**強烈優先使用 `join()**`。
3. **例外處理必須前置(Exception Handling)**
* 如果流水線中的某個任務噴了 `NullPointerException`,後續的 `thenApply` 通通不會執行,而是會一路把錯誤傳遞到最後。你必須使用 `.exceptionally()` 來為流水線裝上「安全保險絲」。
---
### 💻 實作程式範例
```java
package com.example.demo.service;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class AsyncOrderFacade {
@Autowired
private Executor asyncExecutor;
private String fetchUserRatingFromRemote(String userId) {
return "VIP";
}
private Double calculateDiscountByRating(String rating) {
if ("VIP".equals(rating)) return 0.8;
return 1.0;
}
private String fetchOrderDetailFromRemote(String orderId) {
return "{\"id\":\"" + orderId + "\", \"total\": 1000}";
}
public String example4allOf(String userId, String orderId) {
// 1. 啟動異步任務 A:查詢使用者評等 (有回傳值)
CompletableFuture<Double> safeDiscountFuture = CompletableFuture.supplyAsync(() -> {
// 模擬網路查詢耗時
return fetchUserRatingFromRemote(userId);
}, asyncExecutor)
// 2. 當任務 A 成功拿到評等後,自動觸發「計價轉換」
// 遵循單一職責,thenApply 專職做資料轉譯 (將 String 評等轉為 Double 折扣)
.thenApply(rating -> {
log.info("收到用戶評等,自動觸發下一步:計算折扣邏輯");
return calculateDiscountByRating(rating);
})
// 3. 幫這條流水線掛上安全防禦(保險絲模式)
// 如果上述任何一步(查用戶或計價)噴錯,自動回退到基本折扣 1.0,確保系統不崩潰
.exceptionally(ex -> {
log.error("非同步鏈路發生慘劇: {},自動啟動備援策略降級", ex.getMessage(), ex);
return 1.0;
});
// 4. 同步啟動另一個不相干的任務 C:查詢訂單明細
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> {
return fetchOrderDetailFromRemote(orderId);
}, asyncExecutor);
// 5. 大整合 (ALL-OF):將「安全的折扣流水線」與「訂單明細流水線」併發轟炸
// 這裡是非阻斷的,兩條執行緒在後台同時奔跑
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(safeDiscountFuture, orderFuture);
// 6. 最終站:主執行緒在此處阻斷,等待兩條後台線程都收工回來
combinedFuture.join();
try {
// 7. 輕巧地從呼叫器(Future)中直接取出已經就緒的熟食(資料)
Double finalDiscount = safeDiscountFuture.get();
String orderDetail = orderFuture.get();
// 回傳聚合後的標準筆記格式
return String.format("{\"order\": %s, \"appliedDiscount\": %.2f}", orderDetail, finalDiscount);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("最終資料聚合失敗", e);
}
}
@Data
@AllArgsConstructor
class FlightPrice {
private String source;
private int price;
}
private FlightPrice fetchFromVendorApi(String flightNo) {
try { Thread.sleep(800); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return new FlightPrice("VENDOR_API", 5200);
}
private FlightPrice fetchFromDatabase(String flightNo) {
try { Thread.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return new FlightPrice("LOCAL_DB", 5500);
}
private FlightPrice fetchFromRedisCache(String flightNo) {
try { Thread.sleep(40); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return new FlightPrice("REDIS_CACHE", 5000);
}
public FlightPrice example4anyOf(String flightNo) {
// 1. 啟動非同步任務 A:向官網 API 查詢價格 (耗時可能較長)
CompletableFuture<FlightPrice> apiFuture = CompletableFuture.supplyAsync(() -> {
return fetchFromVendorApi(flightNo);
}, asyncExecutor);
// 2. 啟動非同步任務 B:向內部歷史資料庫查詢 (耗時中等)
CompletableFuture<FlightPrice> dbFuture = CompletableFuture.supplyAsync(() -> {
return fetchFromDatabase(flightNo);
}, asyncExecutor);
// 3. 啟動非同步任務 C:向 Redis 分散式快取查詢 (理論上最快)
CompletableFuture<FlightPrice> cacheFuture = CompletableFuture.supplyAsync(() -> {
return fetchFromRedisCache(flightNo);
}, asyncExecutor);
// 4. 使用 anyOf 讓 3 個管道共同進行速度賽跑
// 傳入陣列,注意:此處回傳的泛型退化成了 Object
CompletableFuture<FlightPrice> safeFuture = CompletableFuture.anyOf(apiFuture, dbFuture, cacheFuture)
// 5. 鏈式流水線:處理最快回傳的結果 (thenApply)
.thenApply(rawResult -> {
// 遵循 Clean Code,進行安全的型態檢查與強制轉換 (Cast)
if (rawResult instanceof FlightPrice) {
FlightPrice fastestPrice = (FlightPrice) rawResult;
System.out.println("🏁 anyOf 賽跑結束!最快渠道來源為: " + fastestPrice.getSource());
return fastestPrice;
}
throw new IllegalStateException("未知的回傳資料型態");
})
// 6. 掛上安全保險絲,防禦最快那個渠道噴錯 (例如 Redis 突然掛掉)
.exceptionally(ex -> {
System.err.println("最快渠道發生異常,自動降級啟用全域保底價");
return new FlightPrice("FALLBACK", 9999);
});
// 7. 主執行緒在此處阻斷,等待最快的第一名收工
return safeFuture.join();
}
}
```
```java
package com.example.demo.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class AsyncThreadPoolConfig {
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 1. 核心執行緒數:配合 4 核或 8 核伺服器,I/O 密集型給予適度放大
int cpuCores = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(cpuCores * 2);
// 2. 緩衝佇列:極其重要!必須寫死上限,絕對防禦 OOM 記憶體洩漏
executor.setQueueCapacity(500);
// 3. 最大執行緒數:當佇列爆滿時,允許臨時擴充到最大戰力
executor.setMaxPoolSize(cpuCores * 4);
// 4. 執行緒空閒存活時間:超過核心數的臨時執行緒,發呆超過 60 秒就銷毀
executor.setKeepAliveSeconds(60);
// 5. 執行緒名稱前綴:日誌排查與 Thread Dump 時的救命稻草
executor.setThreadNamePrefix("AsyncKit-Task-");
// 6. 核心精髓:飽和拒絕策略 (RejectedExecutionHandler)
// 使用 CallerRunsPolicy:當池子與佇列通通爆滿時,讓「主執行緒」自己去執行這個任務。
// 好處:1. 絕對不丟失任何一筆任務 2. 讓主執行緒慢下來,對前端產生天然的「限流/背壓 (Backpressure)」效果。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 7. 優雅關閉設定:當 Spring Boot 關閉時,等待未完成的任務跑完再熄燈
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30); // 最多等 30 秒,超過就強行關閉
executor.initialize();
return executor;
}
}
```