## 🧠 Reactor 非阻塞邏輯的簡單說法
想像你在餐廳點餐,有兩種運作方式:
### 🍴 傳統(阻塞)模式
1. 你點餐。
2. 你站在櫃檯前一直等,直到餐做好為止(不能做別的事)。
3. 拿到餐後才繼續處理下一位客人。
🟥 缺點:客人多時效率很差,等待時間很長。
---
### ⚡ Reactor(非阻塞)模式
1. 你點餐。
2. 店員記下你的訂單 → 給你一張取餐號碼牌(相當於訂閱)。
3. 你去旁邊等,不會阻塞其他人點餐。
4. 餐點做好時,廣播叫號(事件完成 → 通知你)。
✅ 優點:可以同時處理很多客人,不會因為等一個而卡住全部。
---
## 🔁 對應到 Java Reactor
| 餐廳情境 | Reactor 概念 |
| ----- | --------------------------- |
| 點餐 | 發出請求(創建一個 Publisher) |
| 拿號碼牌等 | 建立訂閱(Subscriber) |
| 廚房做餐 | 背後執行邏輯(非同步進行) |
| 廣播叫號 | 發布資料(`onNext`/`onComplete`) |
---
## 📦 Code 範例(簡化版)
```java
Mono<String> meal = Mono.fromSupplier(() -> {
// 模擬餐點製作
return "🍔 餐點完成";
});
meal.subscribe(result -> {
System.out.println("收到通知: " + result);
});
```
### 🚫 沒有 `.block()`,所以:
* 程式不會卡在等待結果
* 你可以同時訂多份餐(創建多個 Mono/Flux),系統會並行處理它們
---
## ✅ 重點回顧
| 非阻塞是什麼? | 為什麼重要? |
| --------------- | ---------------- |
| 不會停下來等一個結果 | 可以同時處理很多事情 |
| 用事件通知取代同步等待 | 更省資源、效能更高 |
| Reactor 用流式資料模型 | 清楚表示「資料何時來、如何處理」 |
---
Java Reactor 是 Java 中用來實作 **非同步、反應式(Reactive)程式設計**的一個強大函式庫。它基於 **Reactive Streams 規範**,由 Spring 團隊所開發,屬於 [Project Reactor](https://projectreactor.io/) 的一部分,是 **Spring WebFlux 的核心基礎建設**。
---
## 🧭 為什麼需要 Java Reactor?
傳統 Java 程式是同步/阻塞的,會為每個請求開一條 Thread,如果請求數量一多就會造成資源耗盡。Reactor 採用「非阻塞 + 背壓」機制,可以讓你寫出:
* 非同步 HTTP API(WebClient)
* 非阻塞資料庫呼叫(MongoDB、R2DBC)
* 高併發、事件驅動系統(如 Kafka 消費者)
---
## 🔍 Reactor 核心類別
| 類別 | 說明 |
| --------- | ---------------- |
| `Mono<T>` | 表示 0 或 1 筆資料(單一) |
| `Flux<T>` | 表示 0 到 N 筆資料(串流) |
---
## ✅ 使用場景與用途
| 場景 | 為什麼使用 Reactor? |
| ---------------------- | -------------------------- |
| WebClient 呼叫 API | 非同步,不佔用 thread,處理大量 I/O 請求 |
| 非同步 DB 操作(Mongo/R2DBC) | 避免阻塞,提升吞吐量 |
| 串流處理(Kafka、MQ) | 消費者反應式處理,每秒萬筆訊息仍可穩定處理 |
| 後台處理/批次處理 | 彈性組裝操作鏈,支援背壓,容易測試與除錯 |
---
## 🔧 範例程式
### 1️⃣ 基本 `Mono` 與 `Flux`
```java
Mono<String> mono = Mono.just("hello");
Flux<Integer> flux = Flux.just(1, 2, 3);
mono.subscribe(System.out::println); // hello
flux.subscribe(System.out::println); // 1 2 3
```
---
### 2️⃣ 操作符:`map()`、`filter()`、`flatMap()`
```java
Flux.just("apple", "banana", "cherry")
.filter(f -> f.startsWith("b"))
.map(String::toUpperCase)
.subscribe(System.out::println); // BANANA
```
---
### 3️⃣ 非同步呼叫:`Mono.fromCallable()`
```java
Mono<String> task = Mono.fromCallable(() -> {
Thread.sleep(1000);
return "Async result";
});
task.subscribe(System.out::println); // Async result (延遲 1 秒)
```
---
### 4️⃣ 錯誤處理
```java
Mono.error(new RuntimeException("Boom!"))
.onErrorResume(e -> Mono.just("Recovered"))
.subscribe(System.out::println); // Recovered
```
---
### 5️⃣ WebClient 搭配 Mono(非同步 API 呼叫)
```java
WebClient client = WebClient.create();
client.get()
.uri("https://httpbin.org/get")
.retrieve()
.bodyToMono(String.class)
.subscribe(System.out::println);
```
---
## 🧠 Reactor 與 Blocking 的比較
| 特性 | 傳統 Blocking | Reactor (非同步) |
| ------- | ---------------- | -------------------- |
| 執行方式 | 一個請求一個 Thread | 非同步事件驅動 |
| 效能 | 多併發時效能差 | 更好擴展性、處理萬級請求量 |
| 資源利用 | CPU 常閒置,Thread 多 | Thread 少、CPU 使用率高 |
| 程式撰寫容易度 | 簡單 | 較難(需熟 map/flatMap 等) |
---
## 🎯 結論
**Reactor 適合你如果你:**
* 正在開發高併發微服務(Spring WebFlux)
* 需要非同步 HTTP 呼叫
* 想善用資源、提升應用程式吞吐量
* 用 Reactive MongoDB 或 R2DBC
---
## 🧭 簡單對比表
| 特性 | Java 8 Stream | Reactor (`Mono` / `Flux`) |
| ------------------ | ------------------------------------- | ---------------------------------------- |
| 資料流類型 | **同步**(blocking) | **非同步 + 非阻塞(reactive)** |
| 來源 | **已存在的集合或資料**(在記憶體中) | **可能是延遲產生、非同步來源(如 API)** |
| 資料流大小 | 有限(通常處理已知大小的 List) | 可為有限或無限(像 Kafka、WebSocket) |
| 背壓(Backpressure)支援 | ❌ 不支援 | ✅ 支援,可控制消費速度 |
| 結果取得方式 | 呼叫 `collect()` 等立即取得 | 透過 `subscribe()` 非同步反應 |
| 常見用途 | 資料處理、過濾、轉換(集合操作) | API 呼叫、事件流、資料庫非同步存取等 |
| 範例 | `list.stream().map(...).collect(...)` | `Flux.just(...).map(...).subscribe(...)` |
---
## 🧪 範例比較
### ✅ Java Stream(同步)
```java
List<String> list = Arrays.asList("a", "b", "c");
List<String> result = list.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
System.out.println(result); // [A, B, C]
```
> 資料一次性處理,整個 Stream 是在同一條 thread 上運作。
---
### ✅ Reactor(非同步)
```java
Flux.just("a", "b", "c")
.map(String::toUpperCase)
.subscribe(System.out::println); // 非同步輸出:A B C
```
> 你不會直接「拿到結果」,而是 `subscribe()` 處理資料「推」過來。
---
## 📌 核心差異解釋
| 觀點 | Stream | Reactor |
| ------ | ---------------------------- | -------------------------------- |
| 資料推送模式 | **pull-based**:你拉資料(自己決定何時要) | **push-based**:資料自己來(如事件、API 回應) |
| 使用場景 | 集合處理、計算 | 高併發、非同步服務、API 整合、Reactive 系統 |
| 運行時行為 | 預設同一 thread 執行 | Reactor 可支援 event loop / 排程器 |
---
## 🎯 總結
* Java Stream:**資料處理利器**,用於**同步、有限、記憶體內集合的操作**。
* Reactor:**反應式程式設計利器**,用於**非同步、非阻塞、高併發、可擴展應用**。
---
## ❓何時該選哪一個?
| 情境 | 使用技術 |
| ---------------------------- | --------- |
| 處理 List / Map 等集合資料 | `Stream` |
| 呼叫非同步 API、資料庫 | `Reactor` |
| 處理大量 I/O、WebSocket、Kafka 資料流 | `Reactor` |
| 簡單報表、內部邏輯處理 | `Stream` |
---
# doOnNext
`doOnNext` 是 Reactor (`Mono` / `Flux`) 中常用的副作用(side effect)操作符,用來**在資料流經過時執行某些動作**,例如記錄 log、調用監控、除錯… 但**不會改變資料內容或資料流結構**。
---
## 🔍 定義
```java
Mono<T> or Flux<T>
.doOnNext(Consumer<? super T> onNext)
```
當元素被「**推送**」時(**emit**),執行 `onNext` 中的程式,但不會改變資料或流程。
---
## ✅ 適用時機
| 使用情境 | 說明 |
| --------------- | -------------------------- |
| 🐞 除錯 / Log | 想要在某個中間步驟看看資料流中的值 |
| 📈 監控 / Metrics | 紀錄事件、增加 counter |
| 📂 寫入資料 / 寄送通知 | 接收到資料時執行一些非同步或副作用操作(但不改資料) |
| 🔄 檢查但不影響邏輯 | 想觀察值,但不參與轉換或過濾邏輯 |
---
## 🔧 範例說明
### 🔸 `Flux` 中使用 `doOnNext`
```java
Flux.just("a", "b", "c")
.doOnNext(s -> System.out.println("Received: " + s))
.map(String::toUpperCase)
.subscribe(System.out::println);
// 輸出:
// Received: a
// A
// Received: b
// B
// Received: c
// C
```
👉 `doOnNext` 是在 `map()` 之前執行的。它只是打印資料,但不改變它。
---
### 🔸 `Mono` 中使用
```java
Mono.just("hello")
.doOnNext(s -> System.out.println("Debug: " + s))
.subscribe(System.out::println);
// Debug: hello
// hello
```
---
## ⚠ 與 `map()` 和 `flatMap()` 的差異
| 操作符 | 是否改變資料 | 是否改變流程 | 用途 |
| ---------- | ------ | ------ | --------------- |
| `doOnNext` | ❌ 否 | ❌ 否 | 執行副作用,不改變值 |
| `map` | ✅ 是 | ❌ 否 | 資料轉換 |
| `flatMap` | ✅ 是 | ✅ 是 | 轉換為其他 Publisher |
---
## ❌ 不適用 `doOnNext` 的情況
* 如果你要**轉換資料內容**,請用 `map`。
* 如果你要**串接其他非同步流程**,請用 `flatMap`。
* 如果你要處理錯誤,請用 `onErrorX` 系列,如 `onErrorResume`。
---
## ✅ 最佳實務建議
* 把 `doOnNext` 用在「**觀察 / 除錯 / 副作用處理**」的節點。
* 生產環境中使用時,建議搭配監控記錄(如 Micrometer、Prometheus 指標累計)。
* 不要在 `doOnNext` 中處理會影響業務邏輯的事情(如真正寫入資料庫)。
---
# [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html)
這裡整理了 Java Reactor `Mono` 最常用且實用的函式,方便你快速查閱與學習:
---
## Mono 常用函式總覽
| 函式名稱 | 功能說明 | 範例 |
| ------------------------- | -------------------------------------- | ----------------------------------------------------------------- |
| `just(T data)` | 建立一個包含單一元素的 Mono | `Mono.just("hello")` |
| `empty()` | 建立一個不發出任何元素,只有完成信號的 Mono | `Mono.empty()` |
| `fromCallable()` | 延遲執行 Callable,產生 Mono | `Mono.fromCallable(() -> expensiveOp())` |
| `map(Function)` | 轉換 Mono 內的資料 | `Mono.just("a").map(String::toUpperCase)` |
| `flatMap(Function)` | 非同步轉換,返回另一個 Mono,並展平 | `Mono.just(1).flatMap(i -> Mono.just(i * 2))` |
| `filter(Predicate)` | 過濾資料,符合條件才繼續 | `Mono.just(10).filter(i -> i > 5)` |
| `doOnNext(Consumer)` | 對資料進行副作用操作(例如 log) | `mono.doOnNext(System.out::println)` |
| `doOnError(Consumer)` | 錯誤發生時執行副作用 | `mono.doOnError(e -> log.error(e.getMessage()))` |
| `doOnSuccess(Consumer)` | Mono 成功時執行副作用 | `mono.doOnSuccess(s -> System.out.println("完成"))` |
| `onErrorResume(Function)` | 發生錯誤時切換到另一個 Mono | `mono.onErrorResume(e -> Mono.just("default"))` |
| `onErrorReturn(T)` | 發生錯誤時回傳預設值 | `mono.onErrorReturn("default")` |
| `then()` | 忽略前面結果,執行下一個 Mono,回傳其結果 | `mono.then(Mono.just("next"))` |
| `thenEmpty(Publisher)` | 忽略前面結果,執行一個空 Publisher(通常是 Mono<Void>) | `mono.thenEmpty(Mono.empty())` |
| `block()` | 阻塞取得 Mono 中的資料(慎用,破壞非同步流程) | `String s = mono.block();` |
| `subscribe()` | 開始執行 Mono,並可傳入處理方法 | `mono.subscribe(System.out::println, Throwable::printStackTrace)` |
| `timeout(Duration)` | 超時控制,如果超過時間就拋錯 | `mono.timeout(Duration.ofSeconds(1))` |
---
## 小提醒
* `map` 用於同步轉換,`flatMap` 用於異步轉換(回傳 Mono)
* `doOnNext` / `doOnError` 是副作用,不改變資料流
* `then()` / `thenEmpty()` 常用於串接不需資料回傳的流程
---
# [Flux](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)
這是 Java Reactor `Flux` 的常用函式清單,跟 `Mono` 類似,但多了支援多筆資料流的特性:
---
## Flux 常用函式總覽
| 函式名稱 | 功能說明 | 範例 |
| ----------------------------- | ------------------------------- | ------------------------------------------------ |
| `just(T...)` | 建立一個包含多個元素的 Flux | `Flux.just("a", "b", "c")` |
| `fromIterable(Iterable)` | 從 Iterable 產生 Flux | `Flux.fromIterable(list)` |
| `range(int start, int count)` | 產生一段整數序列 | `Flux.range(1, 5)` 產生 1,2,3,4,5 |
| `map(Function)` | 對每個元素進行同步轉換 | `flux.map(String::toUpperCase)` |
| `flatMap(Function)` | 非同步轉換,每個元素映射到另一個 Publisher,結果展平 | `flux.flatMap(s -> Mono.just(s + "!"))` |
| `filter(Predicate)` | 過濾資料,只保留符合條件的元素 | `flux.filter(s -> s.startsWith("a"))` |
| `buffer(int size)` | 依大小分批收集元素,回傳 List | `flux.buffer(3)` 將元素三個一組當 List 回傳 |
| `concatMap(Function)` | 按序列依次非同步轉換,維持元素順序 | `flux.concatMap(s -> Mono.just(s + "!"))` |
| `flatMapSequential(Function)` | 併發非同步轉換,但保持結果順序 | `flux.flatMapSequential(...)` |
| `mergeWith(Publisher)` | 合併兩個 Flux(資料交錯) | `flux.mergeWith(otherFlux)` |
| `concatWith(Publisher)` | 串接兩個 Flux,先完第一個再開始第二個 | `flux.concatWith(otherFlux)` |
| `take(long n)` | 取前 n 個元素 | `flux.take(5)` |
| `skip(long n)` | 跳過前 n 個元素 | `flux.skip(3)` |
| `distinct()` | 去除重複元素 | `flux.distinct()` |
| `doOnNext(Consumer)` | 對每個元素做副作用 | `flux.doOnNext(System.out::println)` |
| `doOnError(Consumer)` | 錯誤時做副作用 | `flux.doOnError(e -> log.error(e.getMessage()))` |
| `onErrorResume(Function)` | 發生錯誤時切換到另一個 Publisher | `flux.onErrorResume(e -> Flux.empty())` |
| `onErrorReturn(T)` | 發生錯誤時回傳預設值 | `flux.onErrorReturn(Collections.emptyList())` |
| `collectList()` | 收集所有元素為 List,回傳 Mono\<List<T>> | `flux.collectList()` |
| `subscribe()` | 訂閱並開始執行流 | `flux.subscribe(System.out::println)` |
| `takeUntil(Predicate)` | 取元素直到條件符合停止 | `flux.takeUntil(s -> s.equals("stop"))` |
---
## 小技巧
* `flatMap` vs `concatMap` :`flatMap` 不保證順序,`concatMap` 保證元素順序
* `buffer` 可用來分批處理大量資料
* 使用 `collectList()` 可將 Flux 轉成 Mono,方便一次拿全部資料
---