慢速消費者
封鎖的傳輸
當使用 TCP 時,有時網路中斷可能會導致寫入被封鎖。這可能會導致整個 Broker 凍結,而且 Socket 可能永遠無法解除封鎖。目前,我們有一個執行緒會檢查被封鎖的 Socket,使用掃描協定來偵測是否有連線寫入被封鎖超過可設定的時間。這可以運作,但是沒有辦法解除與 Socket 發送關聯的呼叫執行緒的封鎖(在非持久性主題中,它將會是發佈執行緒)。
需要檢查關閉 Socket 是否能解除發送的封鎖。
TCP 傳輸也使用 InactivityMonitor 類別作為 TransportFilter,透過在傳輸閒置時定期強制執行 KeepAliveInfo 指令來偵測失效的 Socket。因此,InactivityMonitor 可以假設如果沒有定期接收到封包,表示傳輸已失效,並產生傳輸例外。
封鎖的消費者
這與上述情況略有不同 - 消費者在處理訊息時被封鎖或非常緩慢。在客戶端方面,連線可以保存的訊息數量受到預取(對於非持久性主題而言,數千個)的限制。
慢速消費者背景說明
慢速消費者可能會在 Broker 中引起問題。以下是我們可以做的各種事情。
一般而言,慢速消費者對佇列的影響不大,因為消費者會競爭訊息;因此,慢速消費者只會取得比其他消費者少的訊息。
非持久性主題
非持久性主題是最受慢速消費者影響的情況,因為訊息不是持久的,而且訊息通常會發送到所有消費者(具有有效選擇器)。
以下是我們可以做的各種事情
- 封鎖/減慢生產者
- 丟棄慢速消費者
- 將訊息緩存到磁碟
- 為慢速消費者丟棄訊息
這些將會作為使用者可插入的策略公開。是否值得在每個目標基礎上進行一次?
持久性主題
我們可以從 RAM 中丟棄訊息,因為它們是持久的,所以我們可以很好地處理慢速消費者(假設您有足夠的磁碟空間)。
如果消費者落後太多,我們可以考慮終止消費者;但我認為這更像是一個背景操作員的問題?
持久性佇列
由於所有訊息都是持久的,因此可以將它們從記憶體中逐出。
非持久性佇列
對於佇列來說,慢速消費者實際上不是問題。但是所有消費者都很慢才是問題。在這種情況下,我們最終會封鎖生產者,直到訊息被消費。
其他選項可能是
- 將訊息緩存到磁碟
- 丟棄訊息
實作解決方案
對於持久訊息:引入不同的分發模型,其中我們每個目的地都有一個執行緒,並具有自己的記憶體分配。這將使我們對分發有更多的控制權,並允許我們為不同的目的地設定不同的優先順序。
對於非持久訊息 - 在生產者 Broker 執行緒和寫入消費者 Socket 之間引入可選的間接層級非常重要。這將允許我們插入寫入磁碟、丟棄訊息和終止被封鎖的 Socket,而不會影響 Broker 中的任何其他連線。
我們目前的預設是封鎖生產者,直到慢速消費者趕上進度(對於此處的非持久性主題)。
另一個應該可行的選項是,如果一個消費者被標記為慢速消費者,那麼我們可以丟棄傳遞給它的訊息,直到它不再是慢速消費者。如果我們有一種將 Subscription 物件標記為慢速的方法,這應該很容易做到。
隨著時間的推移,將會引入更進階的變體。這可能包括
- 僅根據慢速消費者的百分比啟用慢速消費者策略 - 例如,如果所有消費者都很慢,您可能希望封鎖發佈者,但如果只有一兩個消費者很慢,您可能希望採取某些行動
- 關閉慢速消費者
- 將有限數量的訊息寫入磁碟
- 自訂丟棄策略 - 您可能希望根據預先設定的模式或選擇器丟棄訊息
- 以上各項的組合...
慢速消費者偵測器
我們需要一種好的方法來偵測消費者是否很慢 - 以及知道消費者何時再次加速。