RabbitMQ
Ack / Nack / Reject 實作與最佳實務
basicAck
用途:表示「此訊息成功被消費並處理完成」。
語法:channel.basicAck(deliveryTag, multiple)
| 參數 | 說明 |
|---|---|
deliveryTag | 訊息的唯一識別碼(由 RabbitMQ 指派) |
multiple | 若設為 true,代表一次 ack 所有小於等於 deliveryTag 的訊息 |
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);Best Practice
- 通常建議在業務邏輯成功後再 Ack
- 若用 Spring AMQP,可用
ackMode = MANUAL搭配channel.basicAck() - 避免在 try 外 ack,以免出錯時訊息丟失
basicNack
用途:明確告訴 broker「處理失敗」,可選擇是否重新投遞。
語法:channel.basicNack(deliveryTag, multiple, requeue)
| 參數 | 說明 |
|---|---|
requeue = true | 重新放回 queue(通常會造成重複消費) |
requeue = false | 丟棄或轉入 DLX(死信佇列) |
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);Best Practice
- 若業務可重試,設定
requeue = true - 若發現資料異常(如格式錯誤、不可恢復),建議
requeue = false→ 讓 DLX 處理 - 避免無限 requeue(造成訊息風暴),建議加 retry 機制或 DLX 限制次數
basicReject
用途:拒絕單一訊息(不能一次拒絕多個)。
語法:channel.basicReject(deliveryTag, requeue)
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);Best Practice
- 功能與
basicNack類似,但只能針對單一訊息 - 通常
basicNack功能較完整,建議統一使用basicNack
建議的實作模板
@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
try {
String body = new String(message.getBody());
// 處理業務邏輯
processTask(body);
// 成功 → Ack
channel.basicAck(tag, false);
} catch (BusinessException e) {
// 可重試的錯誤
channel.basicNack(tag, false, true);
} catch (Exception e) {
// 不可恢復錯誤 → 丟入死信佇列
channel.basicReject(tag, false);
}
}帶 Retry 機制的版本:
@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel, CustomMessage payload) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
Map<String, Object> headers = message.getMessageProperties().getHeaders();
int retryCount = (int) headers.getOrDefault("x-retry-count", 0);
try {
// 使用業務層 payload 做邏輯
processBusinessLogic(payload);
// 成功 → Ack
channel.basicAck(tag, false);
} catch (TemporaryException e) {
// 臨時錯誤 → Retry 機制
if (retryCount < 3) {
Message newMessage = MessageBuilder
.withBody(message.getBody())
.copyHeaders(headers)
.setHeader("x-retry-count", retryCount + 1)
.build();
rabbitTemplate.convertAndSend(
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
newMessage
);
} else {
log.error("Message {} failed 3 times: {}", payload.getTaskId(), e.getMessage());
}
channel.basicAck(tag, false);
} catch (Exception e) {
// 非臨時性錯誤 → 丟棄
log.error("Fatal error on message: {}", payload, e);
channel.basicReject(tag, false);
}
}常用 Class / Method / Property 一覽表
| 類別 / 元件 | 屬性 / 方法 | 說明 |
|---|---|---|
Channel | basicAck(long tag, boolean multiple) | 手動確認訊息成功處理 |
Channel | basicNack(long tag, boolean multiple, boolean requeue) | 手動拒絕(可選是否重回佇列) |
Channel | basicReject(long tag, boolean requeue) | 拒絕單一訊息 |
Channel | basicConsume(...) | 開始從 queue 消費訊息 |
Message | getBody() | 取得訊息內容(byte[]) |
Message | getMessageProperties() | 取得訊息屬性物件 |
MessageProperties | getDeliveryTag() | 取得 delivery tag(訊息唯一標識) |
MessageProperties | getHeaders() | 自定義 headers(metadata) |
MessageProperties | getCorrelationId() | 用於 Request-Response 模式 |
MessageProperties | getConsumerQueue() | 訊息所屬佇列 |
@RabbitListener | ackMode | 設定 Ack 模式:AUTO、MANUAL、NONE |
AcknowledgeMode | AUTO / MANUAL / NONE | 消費者確認模式 |
AmqpRejectAndDontRequeueException | 例外類別 | 拒絕並不重新入列的標準例外 |
RabbitTemplate | convertAndSend(exchange, routingKey, message) | 發送訊息至指定 Exchange |
SimpleMessageListenerContainer | setAcknowledgeMode(AcknowledgeMode.MANUAL) | 設定手動 ack 模式 |
Ack 模式比較
| 模式 | 行為 | 適用場景 |
|---|---|---|
AUTO | 成功處理後自動 ack,丟例外會自動 nack 並 requeue | 業務邏輯簡單 |
MANUAL | 需自行呼叫 basicAck / basicNack | 複雜業務、需精準控制重試 |
NONE | 不回覆 ack,訊息可能重複 | 非常少見,除非測試用 |
Exchange Patterns
部分內容由 LLM 生成,尚未經過人工驗證。
RabbitMQ 透過 Exchange 決定訊息如何路由到 Queue。Producer 不直接寫入 Queue,而是將訊息發送至 Exchange,再由 Exchange 依類型與 Binding 規則轉發。
Direct Exchange
精確比對 routing key,訊息只送往 binding key 完全相同的 Queue。
flowchart LR
P[Producer] -->|routing_key=error| E[Direct Exchange]
E -->|binding_key=error| Q1[error.queue]
E -->|binding_key=info| Q2[info.queue]
Q1 --> C1[Consumer A]
Q2 --> C2[Consumer B]
@Bean
DirectExchange directExchange() {
return new DirectExchange("logs.direct");
}
@Bean
Binding bindingError(Queue errorQueue, DirectExchange directExchange) {
return BindingBuilder.bind(errorQueue).to(directExchange).with("error");
}Fanout Exchange
廣播:忽略 routing key,訊息複製發送至所有 binding Queue。
flowchart LR
P[Producer] --> E[Fanout Exchange]
E --> Q1[queue-1]
E --> Q2[queue-2]
E --> Q3[queue-3]
Q1 --> C1[Consumer A]
Q2 --> C2[Consumer B]
Q3 --> C3[Consumer C]
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("logs.fanout");
}
@Bean
Binding bindingFanout(Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}Topic Exchange
Routing key 支援萬用字元,實現彈性的訂閱模式。
| 符號 | 說明 |
|---|---|
* | 匹配一個單字段(以 . 分隔) |
# | 匹配零或多個單字段 |
flowchart LR
P[Producer] -->|"logs.error.db"| E[Topic Exchange]
E -->|"logs.error.*"| Q1[error.queue]
E -->|"logs.#"| Q2[all-logs.queue]
Q1 --> C1[Consumer A]
Q2 --> C2[Consumer B]
logs.error.db同時符合logs.error.*與logs.#,兩個 Queue 都會收到。
@Bean
TopicExchange topicExchange() {
return new TopicExchange("logs.topic");
}
@Bean
Binding bindingError(Queue errorQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(errorQueue).to(topicExchange).with("logs.error.*");
}
@Bean
Binding bindingAll(Queue allLogsQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(allLogsQueue).to(topicExchange).with("logs.#");
}Headers Exchange
依 message headers 屬性路由,完全忽略 routing key。Binding 時設定 x-match:
x-match 值 | 說明 |
|---|---|
all | 所有指定 headers 都必須符合(AND) |
any | 任一 header 符合即可(OR) |
flowchart LR
P[Producer] -->|"format=pdf, type=report"| E[Headers Exchange]
E -->|"x-match=all, format=pdf, type=report"| Q1[pdf-report.queue]
E -->|"x-match=any, format=xml"| Q2[xml.queue]
Q1 --> C1[Consumer A]
Q2 --> C2[Consumer B]
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange("docs.headers");
}
@Bean
Binding bindingPdfReport(Queue pdfQueue, HeadersExchange headersExchange) {
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "report");
return BindingBuilder.bind(pdfQueue).to(headersExchange).whereAll(headers).match();
}實務建議(Best Practice Checklist)
| 類別 | 建議 |
|---|---|
| 重試控制 | 使用 Spring RetryTemplate 或自訂 retry 次數,避免無限 requeue |
| 死信佇列 (DLX) | 設定死信交換機處理無法消費的訊息 |
| Idempotency | 在消費邏輯中確保重複訊息不造成副作用 |
| Logging & Tracing | 記錄 deliveryTag、messageId、correlationId 方便追蹤 |
| 批次 Ack | 可用 basicAck(tag, true) 提升效能(小心資料一致性) |
| 監控 | 使用 RabbitMQ Management Plugin 監控 unacked 狀態 |