Spring Reactive

Project Reactor

Architecture (架構)

Reactive Stream

Specification (規範)

Flux & Mono

Operators (算子)

  • Create (建立)
  flowchart TB
    F([Flux])
    Cr[Create]
    c[create]
    j[just]
    f[from]
    fI[fromIterable]
    err[error]
    et[empty]
    n[never]
    F --> Cr
    Cr --> c
    Cr --> j
    Cr --> f
    Cr --> fI
    Cr --> err
    Cr --> et
    Cr --> n
  • Retry (重試)
  flowchart TB
    F([Flux])
    R[Retry]
    r[retry]
    rW[retryWhen]
    rB[retryBackoff]

    F --> R
    R --> r
    R --> rW
    R --> rB
  • Event Handling (事件處理)
  flowchart TB
    F([Flux])
    EH[Event Handling]
    dON[doOnNext]
    dOE[doOnError]
    dOC[doOnComplete]
    h[handler]
    etc[...etc.]

    F --> EH
    EH --> dON
    EH --> dOE
    EH --> dOC
    EH --> h
    EH --> etc
  • Time (時間控制)
  flowchart TB
    F([Flux])
    T[Time]
    t[timeout]
    i[internal]
    df[defer]
    dl[delay]

    F --> T
    T --> t
    T --> i
    T --> df
    T --> dl
  • Transform and Compose (轉換與組合)
  flowchart LR
    F([Flux])
    TC[Transform and Compose]
    m[map]
    fM[flatMap]
    cM[concatMap]
    mW[mergeWith]
    zW[zipWith]
    rd[reduce]
    bf[buffer]
    gr[group]
    cl[collect]

    F --> TC
    TC --> m
    TC --> fM
    TC --> cM
    TC --> mW
    TC --> zW
    TC --> rd
    TC --> bf
    TC --> gr
    TC --> cl
  • Threading and Schedulers (執行緒與調度器)

    1. parallel() : 根據 CPU 核心數計算可用執行緒。
    2. fromExecutorService() : 指定自定義的執行緒池。
    3. boundedElastic() : 根據任務數彈性計算執行緒數(適用於阻塞 I/O)。
    4. single() : 單一執行緒。
    5. immediate() : 當前執行緒。
  flowchart LR
    F([Flux])
    TS[Threading and Schedulers]
    p[parallel]
    sO[subscribeOn]
    pO[publishOn]
    Sd[Schedulers]

    F --> TS
    TS --> p
    p --> runOn
    p --> sequential

    TS --> sO
    TS --> pO

    TS --> Sd
    Sd --> paralle
    Sd --> fromExecutorService
    Sd --> elastic
    Sd --> single
    Sd --> immediate

Cold vs Hot Stream

Backpressure (背壓)

Schedulers

Testing (測試)

Spring WebFlux

WebClient


Glossary (術語表)


Architecture (架構)

Model (模型)

Backpressure (背壓)

背壓 是指當發布者 (Publisher/Producer) 發送的資料量超過訂閱者 (Subscriber/Consumer) 處理能力時的情況。 為了避免資料遺失,背壓機制允許消費者控制或通知發布者資料的發送速度。

Reactive Stream (反應式流)

  • Project Reactor
    • Reactive Stream 規範的實作。
    • 反應式庫。
    • Spring WebFlux 使用 Project Reactor。

Flow (流程)

I

II

Specification (規範介面)

Publisher (發布者)

public interface Publisher<T> {
	void subscribe(Subscriber<? super T> var1);
}

Subscriber (訂閱者)

public interface Subscriber<T> {
	void onSubscribe(Subscription var1);
	void onNext(T var1);
	void onError(Throwable var1);
	void onComplete();
}

Subscription (訂閱關係)

  • 連接發布者與訂閱者的橋樑
public interface Subscription {
	void request(long var1);
	void cancel();
}

Processor (處理者)

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Flux & Mono

  • 實作了 Reactive Streams 規範的反應式類型。
  • reactive-core 模組的一部分。
  • Flux: 代表 0 到 N 個元素。
  • Mono: 代表 0 到 1 個元素。

Methods (常用方法)

just()

建立序列並宣告資料流。

subscribe()

訂閱序列。只有訂閱後才會觸發資料流。

fromArray(), fromIterable(), fromStream()

從陣列、Iterable 或 Stream 物件建立序列。

Flux 專有

range()

interval()

Mono 專有

fromSupplier()

fromCallable()

fromFuture()

fromRunnable()

Schedulers (調度器)

public interface Scheduler extends Disposable {
    // ... 介面定義
}

執行策略

  • immediate(): 當前執行緒。
  • single(): 可重用的單一執行緒。
  • boundedElastic(): 適用於阻塞工作(如 JDBC),具有執行緒上限。
  • parallel(): 適用於並行計算。
  • publishOnsubscribeOn:
    • 用於切換調度器的執行上下文。
    • subscribeOn 在鏈條中通常只生效一次(最靠近源頭的)。

Cold vs Hot Stream

Cold Publisher (冷發布者)

直到有訂閱者訂閱才開始發送資料。每個新訂閱都會產生一份全新的資料。

Hot Publisher (熱發布者)

無論有無訂閱者都可能發送資料。多個訂閱者共享同一個資料來源。

refCount()

發布者會等待其他訂閱者。

cache()

快取發布者發送的資料,供後續訂閱者使用。

WebFlux - WebClient

Methods

retrieve()

直接獲取回應主體並轉換為反應式類型(如 Mono)。適用於只關心回應內容的情況。

exchangeToMono()

提供對 HTTP 回應(狀態碼、標頭、主體)的更細緻控制。

onErrorResume()

onErrorReturn()

Retry (重試機制)

  • 範例:重試 3 次,初始延遲 2 秒,並加入抖動 (Jitter)。
  • Jitter 有助於緩解多個客戶端同時重試造成的「重試風暴」。
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));