RabbitMQ
Ack / Nack / Reject 實作與最佳實務
basicAck
用途:表示「此訊息成功被消費並處理完成」。
語法:channel.basicAck(deliveryTag, multiple)
| 參數 | 說明 |
|---|---|
deliveryTag | 訊息的唯一識別碼(由 RabbitMQ 指派) |
multiple | 若設為 true,代表一次 ack 所有小於等於 deliveryTag 的訊息 |
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);Best Practice
- 通常建議在業務邏輯成功後再 Ack
- 若用 Spring AMQP,可用
ackMode = MANUAL搭配channel.basicAck() - 避免在 try 外 ack,以免出錯時訊息丟失
basicNack
用途:明確告訴 broker「處理失敗」,可選擇是否重新投遞。
語法:channel.basicNack(deliveryTag, multiple, requeue)
| 參數 | 說明 |
|---|---|
requeue = true | 重新放回 queue(通常會造成重複消費) |
requeue = false | 丟棄或轉入 DLX(死信佇列) |
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);Best Practice
- 若業務可重試,設定
requeue = true - 若發現資料異常(如格式錯誤、不可恢復),建議
requeue = false→ 讓 DLX 處理 - 避免無限 requeue(造成訊息風暴),建議加 retry 機制或 DLX 限制次數
basicReject
用途:拒絕單一訊息(不能一次拒絕多個)。
語法:channel.basicReject(deliveryTag, requeue)
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);Best Practice
- 功能與
basicNack類似,但只能針對單一訊息 - 通常
basicNack功能較完整,建議統一使用basicNack
建議的實作模板
@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
try {
String body = new String(message.getBody());
// 處理業務邏輯
processTask(body);
// 成功 → Ack
channel.basicAck(tag, false);
} catch (BusinessException e) {
// 可重試的錯誤
channel.basicNack(tag, false, true);
} catch (Exception e) {
// 不可恢復錯誤 → 丟入死信佇列
channel.basicReject(tag, false);
}
}帶 Retry 機制的版本:
@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel, CustomMessage payload) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
Map<String, Object> headers = message.getMessageProperties().getHeaders();
int retryCount = (int) headers.getOrDefault("x-retry-count", 0);
try {
// 使用業務層 payload 做邏輯
processBusinessLogic(payload);
// 成功 → Ack
channel.basicAck(tag, false);
} catch (TemporaryException e) {
// 臨時錯誤 → Retry 機制
if (retryCount < 3) {
Message newMessage = MessageBuilder
.withBody(message.getBody())
.copyHeaders(headers)
.setHeader("x-retry-count", retryCount + 1)
.build();
rabbitTemplate.convertAndSend(
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
newMessage
);
} else {
log.error("Message {} failed 3 times: {}", payload.getTaskId(), e.getMessage());
}
channel.basicAck(tag, false);
} catch (Exception e) {
// 非臨時性錯誤 → 丟棄
log.error("Fatal error on message: {}", payload, e);
channel.basicReject(tag, false);
}
}常用 Class / Method / Property 一覽表
| 類別 / 元件 | 屬性 / 方法 | 說明 |
|---|---|---|
Channel | basicAck(long tag, boolean multiple) | 手動確認訊息成功處理 |
Channel | basicNack(long tag, boolean multiple, boolean requeue) | 手動拒絕(可選是否重回佇列) |
Channel | basicReject(long tag, boolean requeue) | 拒絕單一訊息 |
Channel | basicConsume(...) | 開始從 queue 消費訊息 |
Message | getBody() | 取得訊息內容(byte[]) |
Message | getMessageProperties() | 取得訊息屬性物件 |
MessageProperties | getDeliveryTag() | 取得 delivery tag(訊息唯一標識) |
MessageProperties | getHeaders() | 自定義 headers(metadata) |
MessageProperties | getCorrelationId() | 用於 Request-Response 模式 |
MessageProperties | getConsumerQueue() | 訊息所屬佇列 |
@RabbitListener | ackMode | 設定 Ack 模式:AUTO、MANUAL、NONE |
AcknowledgeMode | AUTO / MANUAL / NONE | 消費者確認模式 |
AmqpRejectAndDontRequeueException | 例外類別 | 拒絕並不重新入列的標準例外 |
RabbitTemplate | convertAndSend(exchange, routingKey, message) | 發送訊息至指定 Exchange |
SimpleMessageListenerContainer | setAcknowledgeMode(AcknowledgeMode.MANUAL) | 設定手動 ack 模式 |
Ack 模式比較
| 模式 | 行為 | 適用場景 |
|---|---|---|
AUTO | 成功處理後自動 ack,丟例外會自動 nack 並 requeue | 業務邏輯簡單 |
MANUAL | 需自行呼叫 basicAck / basicNack | 複雜業務、需精準控制重試 |
NONE | 不回覆 ack,訊息可能重複 | 非常少見,除非測試用 |
實務建議(Best Practice Checklist)
| 類別 | 建議 |
|---|---|
| 重試控制 | 使用 Spring RetryTemplate 或自訂 retry 次數,避免無限 requeue |
| 死信佇列 (DLX) | 設定死信交換機處理無法消費的訊息 |
| Idempotency | 在消費邏輯中確保重複訊息不造成副作用 |
| Logging & Tracing | 記錄 deliveryTag、messageId、correlationId 方便追蹤 |
| 批次 Ack | 可用 basicAck(tag, true) 提升效能(小心資料一致性) |
| 監控 | 使用 RabbitMQ Management Plugin 監控 unacked 狀態 |