RabbitMQ
Ack / Nack / Reject の実装とベストプラクティス
basicAck
用途 :「このメッセージは正常 に消費 され処理 が完了 した」ことを示 す。
構文
:channel.basicAck(deliveryTag, multiple)
| パラメータ | 説明 |
|---|---|
deliveryTag | メッセージの一意 識別子 (RabbitMQ が割 り当 て) |
multiple | true の場合
、deliveryTag 以下
のすべてのメッセージを一括
ack |
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);ベストプラクティス
通常 、ビジネスロジックが成功 した後 に Ack することを推奨
Spring AMQP を使用 する場合 、
ackMode = MANUALとchannel.basicAck()を組 み合 わせるtry 外 で ack しないようにし、エラー時 のメッセージ損失 を防 ぐ
basicNack
用途 :broker に「処理 失敗 」を明示的 に伝 え、再配信 するかどうかを選択 できる。
構文
:channel.basicNack(deliveryTag, multiple, requeue)
| パラメータ | 説明 |
|---|---|
requeue = true | キューに戻 す(通常 、重複 消費 が発生 ) |
requeue = false | 破棄 または DLX(デッドレターキュー)に転送 |
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);ベストプラクティス
ビジネスが再試行 可能 な場合 、
requeue = trueを設定データ異常 (フォーマットエラー、回復 不可能 )を検出 した場合 、
requeue = false→ DLX で処理無限 requeue(メッセージストーム)を避 け、retry 機構 や DLX で回数 を制限
basicReject
用途 :単一 のメッセージを拒否 (複数 を一度 に拒否 できない)。
構文
:channel.basicReject(deliveryTag, requeue)
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);ベストプラクティス
機能 は
basicNackに類似 しているが、単一 メッセージのみ対象通常 、
basicNackの方 が機能 が豊富 なため、basicNackの使用 を推奨
推奨の実装テンプレート
@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
try {
String body = new String(message.getBody());
// ビジネスロジックを処理
processTask(body);
// 成功 → Ack
channel.basicAck(tag, false);
} catch (BusinessException e) {
// 再試行可能なエラー
channel.basicNack(tag, false, true);
} catch (Exception e) {
// 回復不可能なエラー → デッドレターキューへ
channel.basicReject(tag, false);
}
}Retry 機構 付 きバージョン:
@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel, CustomMessage payload) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
Map<String, Object> headers = message.getMessageProperties().getHeaders();
int retryCount = (int) headers.getOrDefault("x-retry-count", 0);
try {
// ビジネス層 payload でロジックを実行
processBusinessLogic(payload);
// 成功 → Ack
channel.basicAck(tag, false);
} catch (TemporaryException e) {
// 一時的なエラー → Retry 機構
if (retryCount < 3) {
Message newMessage = MessageBuilder
.withBody(message.getBody())
.copyHeaders(headers)
.setHeader("x-retry-count", retryCount + 1)
.build();
rabbitTemplate.convertAndSend(
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
newMessage
);
} else {
log.error("Message {} failed 3 times: {}", payload.getTaskId(), e.getMessage());
}
channel.basicAck(tag, false);
} catch (Exception e) {
// 非一時的なエラー → 破棄
log.error("Fatal error on message: {}", payload, e);
channel.basicReject(tag, false);
}
}よく使う Class / Method / Property 一覧表
| クラス / コンポーネント | 属性 / メソッド | 説明 |
|---|---|---|
Channel | basicAck(long tag, boolean multiple) | 手動 でメッセージ処理 成功 を確認 |
Channel | basicNack(long tag, boolean multiple, boolean requeue) | 手動 拒否 (キューに戻 すか選択 可能 ) |
Channel | basicReject(long tag, boolean requeue) | 単一 メッセージを拒否 |
Channel | basicConsume(...) | キューからメッセージの消費 を開始 |
Message | getBody() | メッセージ内容 を取得 (byte[]) |
Message | getMessageProperties() | メッセージ属性 オブジェクトを取得 |
MessageProperties | getDeliveryTag() | delivery tag(メッセージ一意 識別子 )を取得 |
MessageProperties | getHeaders() | カスタム headers(metadata) |
MessageProperties | getCorrelationId() | Request-Response パターン用 |
MessageProperties | getConsumerQueue() | メッセージが属 するキュー |
@RabbitListener | ackMode | Ack モードを設定
:AUTO、MANUAL、NONE |
AcknowledgeMode | AUTO / MANUAL / NONE | コンシューマー確認 モード |
AmqpRejectAndDontRequeueException | 例外 クラス | 拒否 して再 キューしない標準 例外 |
RabbitTemplate | convertAndSend(exchange, routingKey, message) | 指定 の Exchange にメッセージを送信 |
SimpleMessageListenerContainer | setAcknowledgeMode(AcknowledgeMode.MANUAL) | 手動 ack モードを設定 |
Ack モード比較
| モード | 動作 | 適用 シナリオ |
|---|---|---|
AUTO | 処理 成功 後 に自動 ack、例外 を投 げると自動 nack して requeue | ビジネスロジックが単純 |
MANUAL | basicAck / basicNack を自分
で呼
び出
す必要
あり | 複雑 なビジネス、正確 なリトライ制御 が必要 |
NONE | ack を返 さない、メッセージが重複 する可能性 あり | 非常 に稀 、テスト用 のみ |
実務上の推奨事項(ベストプラクティスチェックリスト)
| カテゴリ | 推奨 |
|---|---|
| リトライ制御 | Spring RetryTemplate またはカスタム retry 回数 を使用 し、無限 requeue を避 ける |
| デッドレターキュー (DLX) | 消費 できないメッセージを処理 するデッドレターエクスチェンジを設定 |
| 冪等性 | 消費 ロジックで重複 メッセージが副作用 を起 こさないことを確認 |
| ロギング & トレーシング | deliveryTag、messageId、correlationId を記録
して追跡
を容易
に |
| 一括 Ack | basicAck(tag, true) でパフォーマンス向上
(データ整合性
に注意
) |
| モニタリング | RabbitMQ Management Plugin で unacked 状態 を監視 |