Spring Reactive
Project Reactor
Architecture (架構)
Reactive Stream
Specification (規範)
Flux & Mono
Operators (算子)
- Create (建立)
flowchart TB
F([Flux])
Cr[Create]
c[create]
j[just]
f[from]
fI[fromIterable]
err[error]
et[empty]
n[never]
F --> Cr
Cr --> c
Cr --> j
Cr --> f
Cr --> fI
Cr --> err
Cr --> et
Cr --> n
- Retry (重試)
flowchart TB
F([Flux])
R[Retry]
r[retry]
rW[retryWhen]
rB[retryBackoff]
F --> R
R --> r
R --> rW
R --> rB
- Event Handling (事件處理)
flowchart TB
F([Flux])
EH[Event Handling]
dON[doOnNext]
dOE[doOnError]
dOC[doOnComplete]
h[handler]
etc[...etc.]
F --> EH
EH --> dON
EH --> dOE
EH --> dOC
EH --> h
EH --> etc
- Time (時間控制)
flowchart TB
F([Flux])
T[Time]
t[timeout]
i[internal]
df[defer]
dl[delay]
F --> T
T --> t
T --> i
T --> df
T --> dl
- Transform and Compose (轉換與組合)
flowchart LR
F([Flux])
TC[Transform and Compose]
m[map]
fM[flatMap]
cM[concatMap]
mW[mergeWith]
zW[zipWith]
rd[reduce]
bf[buffer]
gr[group]
cl[collect]
F --> TC
TC --> m
TC --> fM
TC --> cM
TC --> mW
TC --> zW
TC --> rd
TC --> bf
TC --> gr
TC --> cl
Threading and Schedulers (執行緒與調度器)
parallel(): 根據 CPU 核心數計算可用執行緒。fromExecutorService(): 指定自定義的執行緒池。boundedElastic(): 根據任務數彈性計算執行緒數(適用於阻塞 I/O)。single(): 單一執行緒。immediate(): 當前執行緒。
flowchart LR
F([Flux])
TS[Threading and Schedulers]
p[parallel]
sO[subscribeOn]
pO[publishOn]
Sd[Schedulers]
F --> TS
TS --> p
p --> runOn
p --> sequential
TS --> sO
TS --> pO
TS --> Sd
Sd --> paralle
Sd --> fromExecutorService
Sd --> elastic
Sd --> single
Sd --> immediate
Cold vs Hot Stream
Backpressure (背壓)
Schedulers
Testing (測試)
Spring WebFlux
WebClient
Glossary (術語表)
Architecture (架構)
Model (模型)
Backpressure (背壓)
背壓 是指當發布者 (Publisher/Producer) 發送的資料量超過訂閱者 (Subscriber/Consumer) 處理能力時的情況。
為了避免資料遺失,背壓機制允許消費者控制或通知發布者資料的發送速度。
Reactive Stream (反應式流)
- Project Reactor
- Reactive Stream 規範的實作。
- 反應式庫。
- Spring WebFlux 使用 Project Reactor。
Flow (流程)
I
II
Specification (規範介面)
Publisher (發布者)
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}Subscriber (訂閱者)
public interface Subscriber<T> {
void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}Subscription (訂閱關係)
- 連接發布者與訂閱者的橋樑
public interface Subscription {
void request(long var1);
void cancel();
}Processor (處理者)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}Flux & Mono
- 實作了 Reactive Streams 規範的反應式類型。
- reactive-core 模組的一部分。
Flux: 代表 0 到 N 個元素。Mono: 代表 0 到 1 個元素。
Methods (常用方法)
just()
建立序列並宣告資料流。
subscribe()
訂閱序列。只有訂閱後才會觸發資料流。
fromArray(), fromIterable(), fromStream()
從陣列、Iterable 或 Stream 物件建立序列。
Flux 專有
range()
interval()
Mono 專有
fromSupplier()
fromCallable()
fromFuture()
fromRunnable()
Schedulers (調度器)
public interface Scheduler extends Disposable {
// ... 介面定義
}執行策略
immediate(): 當前執行緒。single(): 可重用的單一執行緒。boundedElastic(): 適用於阻塞工作(如 JDBC),具有執行緒上限。parallel(): 適用於並行計算。publishOn與subscribeOn:- 用於切換調度器的執行上下文。
subscribeOn在鏈條中通常只生效一次(最靠近源頭的)。
Cold vs Hot Stream
Cold Publisher (冷發布者)
直到有訂閱者訂閱才開始發送資料。每個新訂閱都會產生一份全新的資料。
Hot Publisher (熱發布者)
無論有無訂閱者都可能發送資料。多個訂閱者共享同一個資料來源。
refCount()
發布者會等待其他訂閱者。
cache()
快取發布者發送的資料,供後續訂閱者使用。
WebFlux - WebClient
Methods
retrieve()
直接獲取回應主體並轉換為反應式類型(如
Mono)。適用於只關心回應內容的情況。
exchangeToMono()
提供對 HTTP 回應(狀態碼、標頭、主體)的更細緻控制。
onErrorResume()
onErrorReturn()
Retry (重試機制)
- 範例:重試 3 次,初始延遲 2 秒,並加入抖動 (Jitter)。
- Jitter 有助於緩解多個客戶端同時重試造成的「重試風暴」。
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));