RabbitMQ

Ack / Nack / Reject 實作與最佳實務

basicAck

用途:表示「此訊息成功被消費並處理完成」。

語法channel.basicAck(deliveryTag, multiple)

參數說明
deliveryTag訊息的唯一識別碼(由 RabbitMQ 指派)
multiple若設為 true,代表一次 ack 所有小於等於 deliveryTag 的訊息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

Best Practice

  • 通常建議在業務邏輯成功後再 Ack
  • 若用 Spring AMQP,可用 ackMode = MANUAL 搭配 channel.basicAck()
  • 避免在 try 外 ack,以免出錯時訊息丟失

basicNack

用途:明確告訴 broker「處理失敗」,可選擇是否重新投遞。

語法channel.basicNack(deliveryTag, multiple, requeue)

參數說明
requeue = true重新放回 queue(通常會造成重複消費)
requeue = false丟棄或轉入 DLX(死信佇列)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

Best Practice

  • 若業務可重試,設定 requeue = true
  • 若發現資料異常(如格式錯誤、不可恢復),建議 requeue = false → 讓 DLX 處理
  • 避免無限 requeue(造成訊息風暴),建議加 retry 機制或 DLX 限制次數

basicReject

用途:拒絕單一訊息(不能一次拒絕多個)。

語法channel.basicReject(deliveryTag, requeue)

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

Best Practice

  • 功能與 basicNack 類似,但只能針對單一訊息
  • 通常 basicNack 功能較完整,建議統一使用 basicNack

建議的實作模板

@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
    long tag = message.getMessageProperties().getDeliveryTag();
    try {
        String body = new String(message.getBody());
        // 處理業務邏輯
        processTask(body);

        // 成功 → Ack
        channel.basicAck(tag, false);
    } catch (BusinessException e) {
        // 可重試的錯誤
        channel.basicNack(tag, false, true);
    } catch (Exception e) {
        // 不可恢復錯誤 → 丟入死信佇列
        channel.basicReject(tag, false);
    }
}

帶 Retry 機制的版本:

@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel, CustomMessage payload) throws IOException {
    long tag = message.getMessageProperties().getDeliveryTag();
    Map<String, Object> headers = message.getMessageProperties().getHeaders();
    int retryCount = (int) headers.getOrDefault("x-retry-count", 0);

    try {
        // 使用業務層 payload 做邏輯
        processBusinessLogic(payload);

        // 成功 → Ack
        channel.basicAck(tag, false);
    } catch (TemporaryException e) {
        // 臨時錯誤 → Retry 機制
        if (retryCount < 3) {
            Message newMessage = MessageBuilder
                .withBody(message.getBody())
                .copyHeaders(headers)
                .setHeader("x-retry-count", retryCount + 1)
                .build();

            rabbitTemplate.convertAndSend(
                message.getMessageProperties().getReceivedExchange(),
                message.getMessageProperties().getReceivedRoutingKey(),
                newMessage
            );
        } else {
            log.error("Message {} failed 3 times: {}", payload.getTaskId(), e.getMessage());
        }
        channel.basicAck(tag, false);
    } catch (Exception e) {
        // 非臨時性錯誤 → 丟棄
        log.error("Fatal error on message: {}", payload, e);
        channel.basicReject(tag, false);
    }
}

常用 Class / Method / Property 一覽表

類別 / 元件屬性 / 方法說明
ChannelbasicAck(long tag, boolean multiple)手動確認訊息成功處理
ChannelbasicNack(long tag, boolean multiple, boolean requeue)手動拒絕(可選是否重回佇列)
ChannelbasicReject(long tag, boolean requeue)拒絕單一訊息
ChannelbasicConsume(...)開始從 queue 消費訊息
MessagegetBody()取得訊息內容(byte[])
MessagegetMessageProperties()取得訊息屬性物件
MessagePropertiesgetDeliveryTag()取得 delivery tag(訊息唯一標識)
MessagePropertiesgetHeaders()自定義 headers(metadata)
MessagePropertiesgetCorrelationId()用於 Request-Response 模式
MessagePropertiesgetConsumerQueue()訊息所屬佇列
@RabbitListenerackMode設定 Ack 模式:AUTOMANUALNONE
AcknowledgeModeAUTO / MANUAL / NONE消費者確認模式
AmqpRejectAndDontRequeueException例外類別拒絕並不重新入列的標準例外
RabbitTemplateconvertAndSend(exchange, routingKey, message)發送訊息至指定 Exchange
SimpleMessageListenerContainersetAcknowledgeMode(AcknowledgeMode.MANUAL)設定手動 ack 模式

Ack 模式比較

模式行為適用場景
AUTO成功處理後自動 ack,丟例外會自動 nack 並 requeue業務邏輯簡單
MANUAL需自行呼叫 basicAck / basicNack複雜業務、需精準控制重試
NONE不回覆 ack,訊息可能重複非常少見,除非測試用

Exchange Patterns

部分內容由 LLM 生成,尚未經過人工驗證。

RabbitMQ 透過 Exchange 決定訊息如何路由到 Queue。Producer 不直接寫入 Queue,而是將訊息發送至 Exchange,再由 Exchange 依類型與 Binding 規則轉發。

Direct Exchange

精確比對 routing key,訊息只送往 binding key 完全相同的 Queue。

  flowchart LR
    P[Producer] -->|routing_key=error| E[Direct Exchange]
    E -->|binding_key=error| Q1[error.queue]
    E -->|binding_key=info| Q2[info.queue]
    Q1 --> C1[Consumer A]
    Q2 --> C2[Consumer B]
@Bean
DirectExchange directExchange() {
    return new DirectExchange("logs.direct");
}

@Bean
Binding bindingError(Queue errorQueue, DirectExchange directExchange) {
    return BindingBuilder.bind(errorQueue).to(directExchange).with("error");
}

Fanout Exchange

廣播:忽略 routing key,訊息複製發送至所有 binding Queue。

  flowchart LR
    P[Producer] --> E[Fanout Exchange]
    E --> Q1[queue-1]
    E --> Q2[queue-2]
    E --> Q3[queue-3]
    Q1 --> C1[Consumer A]
    Q2 --> C2[Consumer B]
    Q3 --> C3[Consumer C]
@Bean
FanoutExchange fanoutExchange() {
    return new FanoutExchange("logs.fanout");
}

@Bean
Binding bindingFanout(Queue queue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(queue).to(fanoutExchange);
}

Topic Exchange

Routing key 支援萬用字元,實現彈性的訂閱模式。

符號說明
*匹配一個單字段(以 . 分隔)
#匹配零或多個單字段
  flowchart LR
    P[Producer] -->|"logs.error.db"| E[Topic Exchange]
    E -->|"logs.error.*"| Q1[error.queue]
    E -->|"logs.#"| Q2[all-logs.queue]
    Q1 --> C1[Consumer A]
    Q2 --> C2[Consumer B]

logs.error.db 同時符合 logs.error.*logs.#,兩個 Queue 都會收到。

@Bean
TopicExchange topicExchange() {
    return new TopicExchange("logs.topic");
}

@Bean
Binding bindingError(Queue errorQueue, TopicExchange topicExchange) {
    return BindingBuilder.bind(errorQueue).to(topicExchange).with("logs.error.*");
}

@Bean
Binding bindingAll(Queue allLogsQueue, TopicExchange topicExchange) {
    return BindingBuilder.bind(allLogsQueue).to(topicExchange).with("logs.#");
}

Headers Exchange

message headers 屬性路由,完全忽略 routing key。Binding 時設定 x-match

x-match說明
all所有指定 headers 都必須符合(AND)
any任一 header 符合即可(OR)
  flowchart LR
    P[Producer] -->|"format=pdf, type=report"| E[Headers Exchange]
    E -->|"x-match=all, format=pdf, type=report"| Q1[pdf-report.queue]
    E -->|"x-match=any, format=xml"| Q2[xml.queue]
    Q1 --> C1[Consumer A]
    Q2 --> C2[Consumer B]
@Bean
HeadersExchange headersExchange() {
    return new HeadersExchange("docs.headers");
}

@Bean
Binding bindingPdfReport(Queue pdfQueue, HeadersExchange headersExchange) {
    Map<String, Object> headers = new HashMap<>();
    headers.put("format", "pdf");
    headers.put("type", "report");
    return BindingBuilder.bind(pdfQueue).to(headersExchange).whereAll(headers).match();
}

實務建議(Best Practice Checklist)

類別建議
重試控制使用 Spring RetryTemplate 或自訂 retry 次數,避免無限 requeue
死信佇列 (DLX)設定死信交換機處理無法消費的訊息
Idempotency在消費邏輯中確保重複訊息不造成副作用
Logging & Tracing記錄 deliveryTagmessageIdcorrelationId 方便追蹤
批次 Ack可用 basicAck(tag, true) 提升效能(小心資料一致性)
監控使用 RabbitMQ Management Plugin 監控 unacked 狀態