慢速消費者

開發人員 > 開發人員指南 > 設計文件 > 慢速消費者

封鎖的傳輸

當使用 TCP 時,有時網路中斷可能會導致寫入被封鎖。這可能會導致整個 Broker 凍結,而且 Socket 可能永遠無法解除封鎖。目前,我們有一個執行緒會檢查被封鎖的 Socket,使用掃描協定來偵測是否有連線寫入被封鎖超過可設定的時間。這可以運作,但是沒有辦法解除與 Socket 發送關聯的呼叫執行緒的封鎖(在非持久性主題中,它將會是發佈執行緒)。

需要檢查關閉 Socket 是否能解除發送的封鎖。

TCP 傳輸也使用 InactivityMonitor 類別作為 TransportFilter,透過在傳輸閒置時定期強制執行 KeepAliveInfo 指令來偵測失效的 Socket。因此,InactivityMonitor 可以假設如果沒有定期接收到封包,表示傳輸已失效,並產生傳輸例外。

封鎖的消費者

這與上述情況略有不同 - 消費者在處理訊息時被封鎖或非常緩慢。在客戶端方面,連線可以保存的訊息數量受到預取(對於非持久性主題而言,數千個)的限制。

慢速消費者背景說明

慢速消費者可能會在 Broker 中引起問題。以下是我們可以做的各種事情。

一般而言,慢速消費者對佇列的影響不大,因為消費者會競爭訊息;因此,慢速消費者只會取得比其他消費者少的訊息。

非持久性主題

非持久性主題是最受慢速消費者影響的情況,因為訊息不是持久的,而且訊息通常會發送到所有消費者(具有有效選擇器)。

以下是我們可以做的各種事情

  • 封鎖/減慢生產者
  • 丟棄慢速消費者
  • 將訊息緩存到磁碟
  • 為慢速消費者丟棄訊息

這些將會作為使用者可插入的策略公開。是否值得在每個目標基礎上進行一次?

持久性主題

我們可以從 RAM 中丟棄訊息,因為它們是持久的,所以我們可以很好地處理慢速消費者(假設您有足夠的磁碟空間)。
如果消費者落後太多,我們可以考慮終止消費者;但我認為這更像是一個背景操作員的問題?

持久性佇列

由於所有訊息都是持久的,因此可以將它們從記憶體中逐出。

非持久性佇列

對於佇列來說,慢速消費者實際上不是問題。但是所有消費者都很慢才是問題。在這種情況下,我們最終會封鎖生產者,直到訊息被消費。

其他選項可能是

  • 將訊息緩存到磁碟
  • 丟棄訊息

實作解決方案

對於持久訊息:引入不同的分發模型,其中我們每個目的地都有一個執行緒,並具有自己的記憶體分配。這將使我們對分發有更多的控制權,並允許我們為不同的目的地設定不同的優先順序。

對於非持久訊息 - 在生產者 Broker 執行緒和寫入消費者 Socket 之間引入可選的間接層級非常重要。這將允許我們插入寫入磁碟、丟棄訊息和終止被封鎖的 Socket,而不會影響 Broker 中的任何其他連線。

我們目前的預設是封鎖生產者,直到慢速消費者趕上進度(對於此處的非持久性主題)。

另一個應該可行的選項是,如果一個消費者被標記為慢速消費者,那麼我們可以丟棄傳遞給它的訊息,直到它不再是慢速消費者。如果我們有一種將 Subscription 物件標記為慢速的方法,這應該很容易做到。

隨著時間的推移,將會引入更進階的變體。這可能包括

  • 僅根據慢速消費者的百分比啟用慢速消費者策略 - 例如,如果所有消費者都很慢,您可能希望封鎖發佈者,但如果只有一兩個消費者很慢,您可能希望採取某些行動
  • 關閉慢速消費者
  • 將有限數量的訊息寫入磁碟
  • 自訂丟棄策略 - 您可能希望根據預先設定的模式或選擇器丟棄訊息
  • 以上各項的組合...

慢速消費者偵測器

我們需要一種好的方法來偵測消費者是否很慢 - 以及知道消費者何時再次加速。

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛標誌和 Apache ActiveMQ 專案標誌是 The Apache Software Foundation 的商標。 版權所有 © 2024,The Apache Software Foundation。 根據 Apache License 2.0 授權。