markdown-it
demo
Delete
Submit
clear
permalink
## 🧠 Reactor 非阻塞邏輯的簡單說法 想像你在餐廳點餐,有兩種運作方式: ### 🍴 傳統(阻塞)模式 1. 你點餐。 2. 你站在櫃檯前一直等,直到餐做好為止(不能做別的事)。 3. 拿到餐後才繼續處理下一位客人。 🟥 缺點:客人多時效率很差,等待時間很長。 --- ### ⚡ Reactor(非阻塞)模式 1. 你點餐。 2. 店員記下你的訂單 → 給你一張取餐號碼牌(相當於訂閱)。 3. 你去旁邊等,不會阻塞其他人點餐。 4. 餐點做好時,廣播叫號(事件完成 → 通知你)。 ✅ 優點:可以同時處理很多客人,不會因為等一個而卡住全部。 --- ## 🔁 對應到 Java Reactor | 餐廳情境 | Reactor 概念 | | ----- | --------------------------- | | 點餐 | 發出請求(創建一個 Publisher) | | 拿號碼牌等 | 建立訂閱(Subscriber) | | 廚房做餐 | 背後執行邏輯(非同步進行) | | 廣播叫號 | 發布資料(`onNext`/`onComplete`) | --- ## 📦 Code 範例(簡化版) ```java Mono<String> meal = Mono.fromSupplier(() -> { // 模擬餐點製作 return "🍔 餐點完成"; }); meal.subscribe(result -> { System.out.println("收到通知: " + result); }); ``` ### 🚫 沒有 `.block()`,所以: * 程式不會卡在等待結果 * 你可以同時訂多份餐(創建多個 Mono/Flux),系統會並行處理它們 --- ## ✅ 重點回顧 | 非阻塞是什麼? | 為什麼重要? | | --------------- | ---------------- | | 不會停下來等一個結果 | 可以同時處理很多事情 | | 用事件通知取代同步等待 | 更省資源、效能更高 | | Reactor 用流式資料模型 | 清楚表示「資料何時來、如何處理」 | --- Java Reactor 是 Java 中用來實作 **非同步、反應式(Reactive)程式設計**的一個強大函式庫。它基於 **Reactive Streams 規範**,由 Spring 團隊所開發,屬於 [Project Reactor](https://projectreactor.io/) 的一部分,是 **Spring WebFlux 的核心基礎建設**。 --- ## 🧭 為什麼需要 Java Reactor? 傳統 Java 程式是同步/阻塞的,會為每個請求開一條 Thread,如果請求數量一多就會造成資源耗盡。Reactor 採用「非阻塞 + 背壓」機制,可以讓你寫出: * 非同步 HTTP API(WebClient) * 非阻塞資料庫呼叫(MongoDB、R2DBC) * 高併發、事件驅動系統(如 Kafka 消費者) --- ## 🔍 Reactor 核心類別 | 類別 | 說明 | | --------- | ---------------- | | `Mono<T>` | 表示 0 或 1 筆資料(單一) | | `Flux<T>` | 表示 0 到 N 筆資料(串流) | --- ## ✅ 使用場景與用途 | 場景 | 為什麼使用 Reactor? | | ---------------------- | -------------------------- | | WebClient 呼叫 API | 非同步,不佔用 thread,處理大量 I/O 請求 | | 非同步 DB 操作(Mongo/R2DBC) | 避免阻塞,提升吞吐量 | | 串流處理(Kafka、MQ) | 消費者反應式處理,每秒萬筆訊息仍可穩定處理 | | 後台處理/批次處理 | 彈性組裝操作鏈,支援背壓,容易測試與除錯 | --- ## 🔧 範例程式 ### 1️⃣ 基本 `Mono` 與 `Flux` ```java Mono<String> mono = Mono.just("hello"); Flux<Integer> flux = Flux.just(1, 2, 3); mono.subscribe(System.out::println); // hello flux.subscribe(System.out::println); // 1 2 3 ``` --- ### 2️⃣ 操作符:`map()`、`filter()`、`flatMap()` ```java Flux.just("apple", "banana", "cherry") .filter(f -> f.startsWith("b")) .map(String::toUpperCase) .subscribe(System.out::println); // BANANA ``` --- ### 3️⃣ 非同步呼叫:`Mono.fromCallable()` ```java Mono<String> task = Mono.fromCallable(() -> { Thread.sleep(1000); return "Async result"; }); task.subscribe(System.out::println); // Async result (延遲 1 秒) ``` --- ### 4️⃣ 錯誤處理 ```java Mono.error(new RuntimeException("Boom!")) .onErrorResume(e -> Mono.just("Recovered")) .subscribe(System.out::println); // Recovered ``` --- ### 5️⃣ WebClient 搭配 Mono(非同步 API 呼叫) ```java WebClient client = WebClient.create(); client.get() .uri("https://httpbin.org/get") .retrieve() .bodyToMono(String.class) .subscribe(System.out::println); ``` --- ## 🧠 Reactor 與 Blocking 的比較 | 特性 | 傳統 Blocking | Reactor (非同步) | | ------- | ---------------- | -------------------- | | 執行方式 | 一個請求一個 Thread | 非同步事件驅動 | | 效能 | 多併發時效能差 | 更好擴展性、處理萬級請求量 | | 資源利用 | CPU 常閒置,Thread 多 | Thread 少、CPU 使用率高 | | 程式撰寫容易度 | 簡單 | 較難(需熟 map/flatMap 等) | --- ## 🎯 結論 **Reactor 適合你如果你:** * 正在開發高併發微服務(Spring WebFlux) * 需要非同步 HTTP 呼叫 * 想善用資源、提升應用程式吞吐量 * 用 Reactive MongoDB 或 R2DBC --- ## 🧭 簡單對比表 | 特性 | Java 8 Stream | Reactor (`Mono` / `Flux`) | | ------------------ | ------------------------------------- | ---------------------------------------- | | 資料流類型 | **同步**(blocking) | **非同步 + 非阻塞(reactive)** | | 來源 | **已存在的集合或資料**(在記憶體中) | **可能是延遲產生、非同步來源(如 API)** | | 資料流大小 | 有限(通常處理已知大小的 List) | 可為有限或無限(像 Kafka、WebSocket) | | 背壓(Backpressure)支援 | ❌ 不支援 | ✅ 支援,可控制消費速度 | | 結果取得方式 | 呼叫 `collect()` 等立即取得 | 透過 `subscribe()` 非同步反應 | | 常見用途 | 資料處理、過濾、轉換(集合操作) | API 呼叫、事件流、資料庫非同步存取等 | | 範例 | `list.stream().map(...).collect(...)` | `Flux.just(...).map(...).subscribe(...)` | --- ## 🧪 範例比較 ### ✅ Java Stream(同步) ```java List<String> list = Arrays.asList("a", "b", "c"); List<String> result = list.stream() .map(String::toUpperCase) .collect(Collectors.toList()); System.out.println(result); // [A, B, C] ``` > 資料一次性處理,整個 Stream 是在同一條 thread 上運作。 --- ### ✅ Reactor(非同步) ```java Flux.just("a", "b", "c") .map(String::toUpperCase) .subscribe(System.out::println); // 非同步輸出:A B C ``` > 你不會直接「拿到結果」,而是 `subscribe()` 處理資料「推」過來。 --- ## 📌 核心差異解釋 | 觀點 | Stream | Reactor | | ------ | ---------------------------- | -------------------------------- | | 資料推送模式 | **pull-based**:你拉資料(自己決定何時要) | **push-based**:資料自己來(如事件、API 回應) | | 使用場景 | 集合處理、計算 | 高併發、非同步服務、API 整合、Reactive 系統 | | 運行時行為 | 預設同一 thread 執行 | Reactor 可支援 event loop / 排程器 | --- ## 🎯 總結 * Java Stream:**資料處理利器**,用於**同步、有限、記憶體內集合的操作**。 * Reactor:**反應式程式設計利器**,用於**非同步、非阻塞、高併發、可擴展應用**。 --- ## ❓何時該選哪一個? | 情境 | 使用技術 | | ---------------------------- | --------- | | 處理 List / Map 等集合資料 | `Stream` | | 呼叫非同步 API、資料庫 | `Reactor` | | 處理大量 I/O、WebSocket、Kafka 資料流 | `Reactor` | | 簡單報表、內部邏輯處理 | `Stream` | --- # doOnNext `doOnNext` 是 Reactor (`Mono` / `Flux`) 中常用的副作用(side effect)操作符,用來**在資料流經過時執行某些動作**,例如記錄 log、調用監控、除錯… 但**不會改變資料內容或資料流結構**。 --- ## 🔍 定義 ```java Mono<T> or Flux<T> .doOnNext(Consumer<? super T> onNext) ``` 當元素被「**推送**」時(**emit**),執行 `onNext` 中的程式,但不會改變資料或流程。 --- ## ✅ 適用時機 | 使用情境 | 說明 | | --------------- | -------------------------- | | 🐞 除錯 / Log | 想要在某個中間步驟看看資料流中的值 | | 📈 監控 / Metrics | 紀錄事件、增加 counter | | 📂 寫入資料 / 寄送通知 | 接收到資料時執行一些非同步或副作用操作(但不改資料) | | 🔄 檢查但不影響邏輯 | 想觀察值,但不參與轉換或過濾邏輯 | --- ## 🔧 範例說明 ### 🔸 `Flux` 中使用 `doOnNext` ```java Flux.just("a", "b", "c") .doOnNext(s -> System.out.println("Received: " + s)) .map(String::toUpperCase) .subscribe(System.out::println); // 輸出: // Received: a // A // Received: b // B // Received: c // C ``` 👉 `doOnNext` 是在 `map()` 之前執行的。它只是打印資料,但不改變它。 --- ### 🔸 `Mono` 中使用 ```java Mono.just("hello") .doOnNext(s -> System.out.println("Debug: " + s)) .subscribe(System.out::println); // Debug: hello // hello ``` --- ## ⚠ 與 `map()` 和 `flatMap()` 的差異 | 操作符 | 是否改變資料 | 是否改變流程 | 用途 | | ---------- | ------ | ------ | --------------- | | `doOnNext` | ❌ 否 | ❌ 否 | 執行副作用,不改變值 | | `map` | ✅ 是 | ❌ 否 | 資料轉換 | | `flatMap` | ✅ 是 | ✅ 是 | 轉換為其他 Publisher | --- ## ❌ 不適用 `doOnNext` 的情況 * 如果你要**轉換資料內容**,請用 `map`。 * 如果你要**串接其他非同步流程**,請用 `flatMap`。 * 如果你要處理錯誤,請用 `onErrorX` 系列,如 `onErrorResume`。 --- ## ✅ 最佳實務建議 * 把 `doOnNext` 用在「**觀察 / 除錯 / 副作用處理**」的節點。 * 生產環境中使用時,建議搭配監控記錄(如 Micrometer、Prometheus 指標累計)。 * 不要在 `doOnNext` 中處理會影響業務邏輯的事情(如真正寫入資料庫)。 --- # [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) 這裡整理了 Java Reactor `Mono` 最常用且實用的函式,方便你快速查閱與學習: --- ## Mono 常用函式總覽 | 函式名稱 | 功能說明 | 範例 | | ------------------------- | -------------------------------------- | ----------------------------------------------------------------- | | `just(T data)` | 建立一個包含單一元素的 Mono | `Mono.just("hello")` | | `empty()` | 建立一個不發出任何元素,只有完成信號的 Mono | `Mono.empty()` | | `fromCallable()` | 延遲執行 Callable,產生 Mono | `Mono.fromCallable(() -> expensiveOp())` | | `map(Function)` | 轉換 Mono 內的資料 | `Mono.just("a").map(String::toUpperCase)` | | `flatMap(Function)` | 非同步轉換,返回另一個 Mono,並展平 | `Mono.just(1).flatMap(i -> Mono.just(i * 2))` | | `filter(Predicate)` | 過濾資料,符合條件才繼續 | `Mono.just(10).filter(i -> i > 5)` | | `doOnNext(Consumer)` | 對資料進行副作用操作(例如 log) | `mono.doOnNext(System.out::println)` | | `doOnError(Consumer)` | 錯誤發生時執行副作用 | `mono.doOnError(e -> log.error(e.getMessage()))` | | `doOnSuccess(Consumer)` | Mono 成功時執行副作用 | `mono.doOnSuccess(s -> System.out.println("完成"))` | | `onErrorResume(Function)` | 發生錯誤時切換到另一個 Mono | `mono.onErrorResume(e -> Mono.just("default"))` | | `onErrorReturn(T)` | 發生錯誤時回傳預設值 | `mono.onErrorReturn("default")` | | `then()` | 忽略前面結果,執行下一個 Mono,回傳其結果 | `mono.then(Mono.just("next"))` | | `thenEmpty(Publisher)` | 忽略前面結果,執行一個空 Publisher(通常是 Mono<Void>) | `mono.thenEmpty(Mono.empty())` | | `block()` | 阻塞取得 Mono 中的資料(慎用,破壞非同步流程) | `String s = mono.block();` | | `subscribe()` | 開始執行 Mono,並可傳入處理方法 | `mono.subscribe(System.out::println, Throwable::printStackTrace)` | | `timeout(Duration)` | 超時控制,如果超過時間就拋錯 | `mono.timeout(Duration.ofSeconds(1))` | --- ## 小提醒 * `map` 用於同步轉換,`flatMap` 用於異步轉換(回傳 Mono) * `doOnNext` / `doOnError` 是副作用,不改變資料流 * `then()` / `thenEmpty()` 常用於串接不需資料回傳的流程 --- # [Flux](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) 這是 Java Reactor `Flux` 的常用函式清單,跟 `Mono` 類似,但多了支援多筆資料流的特性: --- ## Flux 常用函式總覽 | 函式名稱 | 功能說明 | 範例 | | ----------------------------- | ------------------------------- | ------------------------------------------------ | | `just(T...)` | 建立一個包含多個元素的 Flux | `Flux.just("a", "b", "c")` | | `fromIterable(Iterable)` | 從 Iterable 產生 Flux | `Flux.fromIterable(list)` | | `range(int start, int count)` | 產生一段整數序列 | `Flux.range(1, 5)` 產生 1,2,3,4,5 | | `map(Function)` | 對每個元素進行同步轉換 | `flux.map(String::toUpperCase)` | | `flatMap(Function)` | 非同步轉換,每個元素映射到另一個 Publisher,結果展平 | `flux.flatMap(s -> Mono.just(s + "!"))` | | `filter(Predicate)` | 過濾資料,只保留符合條件的元素 | `flux.filter(s -> s.startsWith("a"))` | | `buffer(int size)` | 依大小分批收集元素,回傳 List | `flux.buffer(3)` 將元素三個一組當 List 回傳 | | `concatMap(Function)` | 按序列依次非同步轉換,維持元素順序 | `flux.concatMap(s -> Mono.just(s + "!"))` | | `flatMapSequential(Function)` | 併發非同步轉換,但保持結果順序 | `flux.flatMapSequential(...)` | | `mergeWith(Publisher)` | 合併兩個 Flux(資料交錯) | `flux.mergeWith(otherFlux)` | | `concatWith(Publisher)` | 串接兩個 Flux,先完第一個再開始第二個 | `flux.concatWith(otherFlux)` | | `take(long n)` | 取前 n 個元素 | `flux.take(5)` | | `skip(long n)` | 跳過前 n 個元素 | `flux.skip(3)` | | `distinct()` | 去除重複元素 | `flux.distinct()` | | `doOnNext(Consumer)` | 對每個元素做副作用 | `flux.doOnNext(System.out::println)` | | `doOnError(Consumer)` | 錯誤時做副作用 | `flux.doOnError(e -> log.error(e.getMessage()))` | | `onErrorResume(Function)` | 發生錯誤時切換到另一個 Publisher | `flux.onErrorResume(e -> Flux.empty())` | | `onErrorReturn(T)` | 發生錯誤時回傳預設值 | `flux.onErrorReturn(Collections.emptyList())` | | `collectList()` | 收集所有元素為 List,回傳 Mono\<List<T>> | `flux.collectList()` | | `subscribe()` | 訂閱並開始執行流 | `flux.subscribe(System.out::println)` | | `takeUntil(Predicate)` | 取元素直到條件符合停止 | `flux.takeUntil(s -> s.equals("stop"))` | --- ## 小技巧 * `flatMap` vs `concatMap` :`flatMap` 不保證順序,`concatMap` 保證元素順序 * `buffer` 可用來分批處理大量資料 * 使用 `collectList()` 可將 Flux 轉成 Mono,方便一次拿全部資料 ---
html
source
debug
Fork me on GitHub