Spring Reactive
Project Reactor
アーキテクチャ
Reactive Stream
仕様
Flux & Mono
オペレーター
- 作成
flowchart TB
F([Flux])
Cr[Create]
c[create]
j[just]
f[from]
fI[fromIterable]
err[error]
et[empty]
n[never]
F --> Cr
Cr --> c
Cr --> j
Cr --> f
Cr --> fI
Cr --> err
Cr --> et
Cr --> n
- リトライ
flowchart TB
F([Flux])
R[Retry]
r[retry]
rW[retryWhen]
rB[retryBackoff]
F --> R
R --> r
R --> rW
R --> rB
- イベント処理
flowchart TB
F([Flux])
EH[Event Handling]
dON[doOnNext]
dOE[doOnError]
dOC[doOnComplete]
h[handler]
etc[...etc.]
F --> EH
EH --> dON
EH --> dOE
EH --> dOC
EH --> h
EH --> etc
- 時間管理
flowchart TB
F([Flux])
T[Time]
t[timeout]
i[internal]
df[defer]
dl[delay]
F --> T
T --> t
T --> i
T --> df
T --> dl
- 変換 と結合
flowchart LR
F([Flux])
TC[Transform and Compose]
m[map]
fM[flatMap]
cM[concatMap]
mW[mergeWith]
zW[zipWith]
rd[reduce]
bf[buffer]
gr[group]
cl[collect]
F --> TC
TC --> m
TC --> fM
TC --> cM
TC --> mW
TC --> zW
TC --> rd
TC --> bf
TC --> gr
TC --> cl
スレッドとスケジューラ
parallel():CPU コア数 に基 づく。fromExecutorService():独自 のプール。boundedElastic():タスク数 に応 じてスレッドを調整 。single():シングルスレッド。immediate():現在 のスレッド。
flowchart LR
F([Flux])
TS[Threading and Schedulers]
p[parallel]
sO[subscribeOn]
pO[publishOn]
Sd[Schedulers]
F --> TS
TS --> p
p --> runOn
p --> sequential
TS --> sO
TS --> pO
TS --> Sd
Sd --> paralle
Sd --> fromExecutorService
Sd --> elastic
Sd --> single
Sd --> immediate
Cold vs Hot Stream
バックプレッシャー
スケジューラ
テスト
Spring WebFlux
WebClient
用語集
アーキテクチャ
モデル
バックプレッシャー
バックプレッシャー とは、発行者
が送信
するデータ量
が購読者
の処理能力
を超
える状況
のことです。これを制御
することでデータ損失
を防
ぎます。
Reactive Stream
- Project Reactor
- Reactive Stream 仕様 の実装 。
- Spring WebFlux で採用 されています。
フロー
I
II
仕様
Publisher
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}Subscriber
public interface Subscriber<T> {
void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}Subscription
- 発行者 と購読者 を接続 します
public interface Subscription {
void request(long var1);
void cancel();
}Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}Flux & Mono
- Reactive Streams 仕様 を実装 したリアクティブタイプ。
Flux:0 から N 個 の要素 。Mono:0 から 1 個 の要素 。
メソッド
just()
シーケンスを作成 し、データストリームを宣言 します。
subscribe()
シーケンスをサブスクライブします。これによりデータストリームが開始 されます。
fromArray(), fromIterable(), fromStream()
配列 や Iterable からシーケンスを作成 します。
Flux 専用
range()
interval()
Mono 専用
fromSupplier()
fromCallable()
fromFuture()
fromRunnable()
スケジューラ
実行戦略
immediate():現在 のスレッド。single():シングルスレッド。boundedElastic():ブロッキング処理用 。parallel():並列計算用 。publishOnとsubscribeOn:実行コンテキストの切 り替 え。
Cold vs Hot Stream
Cold Publisher
サブスクライブされるまで送信 を開始 しません。新規購読 ごとにデータが生成 されます。
Hot Publisher
購読者の有無 に関 わらずデータを送信 する可能性 があります。
refCount()
パブリッシャーは他 の購読者を待機 します。
cache()
パブリッシャーが送信 したデータをキャッシュします。
WebFlux - WebClient
メソッド
retrieve()
レスポンスボディを直接取得 します。
exchangeToMono()
HTTP レスポンス(ステータス、ヘッダー、ボディ)を詳細 に制御 します。
onErrorResume()
onErrorReturn()
リトライ
- リトライ 3 回 、遅延 2 秒 、ジッター(Jitter)を適用 します。
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));