慢速消費者處理
慢速消費者可能會在非持久性主題上造成問題,因為它們會迫使訊息仲介程式將舊訊息保留在 RAM 中,一旦 RAM 填滿,就會迫使訊息仲介程式減慢生產者的速度,導致快速消費者也減慢速度。我們未來可以實施的一個選項是假脫機到磁碟 - 但假脫機到磁碟也可能會減慢快速消費者的速度。
目前,我們有一種策略,可讓您設定仲介程式為消費者保留的最大符合訊息數量,除了其預先擷取緩衝區之外。一旦達到此最大值,當新訊息進入時,較舊的訊息將被丟棄。這可讓您將 RAM 保留給當前訊息,並繼續向慢速消費者傳送訊息,但會丟棄舊訊息。
待處理訊息限制策略
您可以在目的地對應上設定 PendingMessageLimitStrategy
實作類別,以便您主題命名空間的不同區域可以針對慢速消費者採用不同的處理策略。例如,您可能希望對高容量的價格使用此策略,但對於較低容量的訂單和交易,您可能不希望丟棄舊訊息。
此策略會計算要為消費者保留在 RAM 中的最大待處理訊息數量(超過其預先擷取大小)。值為零表示除了預先擷取數量外,不保留任何訊息。大於零的值將保留最多該數量的訊息,當新訊息進入時,丟棄較舊的訊息。值為 -1
表示停用訊息丟棄。
目前有兩種不同的策略實作
ConstantPendingMessageLimitStrategy
PrefetchRatePendingMessageLimitStrategy
ConstantPendingMessageLimitStrategy
此策略對所有消費者使用一個常數限制(超過其預先擷取大小)。
範例
<constantPendingMessageLimitStrategy limit="50"/>
PrefetchRatePendingMessageLimitStrategy
此策略會使用消費者預先擷取大小的乘數來計算最大待處理訊息數量。因此,舉例來說,您可以為每個消費者保留大約 2.5 倍的預先擷取計數。
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
使用預先擷取原則設定限制
JMS 用戶端有一個 預先擷取原則,您可以用它來設定持久性和非持久性佇列和主題的各種預先擷取限制。預先擷取原則也允許您在每個連線/消費者的基礎上指定 maximumPendingMessageLimit
。設定此值的一個小差異是;為了簡化與非 JMS 用戶端(例如使用 OpenWire)的操作,值為零時會被忽略;因此您可以設定的最小值為 1
。
設定逐出策略
我們有一個 MessageEvictionStrategy
,用於決定應該逐出慢速消費者上的哪個訊息。預設實作是
<oldestMessageEvictionStrategy/>
但是,您可以編寫自己的實作,以使用一些應用程式特定的方式來選擇要逐出的訊息。例如,如果您要傳送市場數據價格更新,您可能希望找到較舊的價格值,這可能不是最舊的訊息。
範例
<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>
其中 propertyName
是指定價格的 JMS 訊息屬性。
另一個選項可能是使用具有最低優先順序訊息的最舊訊息。因此,如果您有一些高優先順序訊息,請先逐出較低優先順序的訊息,即使它們是較新的訊息。
<oldestMessageWithLowestPriorityEvictionStrategy/>
範例
以下範例顯示了 ActiveMQ Classic 訊息仲介程式設定檔。請注意,在 PRICES.>
萬用字元範圍內的主題,pendingMessageLimitStrategy
屬性設定為僅為每個消費者保留大約 10
條訊息,超過其預先擷取緩衝區大小。
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="https://activemq.dev.org.tw/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
https://activemq.dev.org.tw/schema/core
https://activemq.dev.org.tw/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker xmlns="https://activemq.dev.org.tw/schema/core" persistent="false" brokerName="${brokername}">
<!-- lets define the dispatch policy -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>">
<dispatchPolicy>
<roundRobinDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="ORDERS.>">
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<!-- 1 minutes worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="PRICES.>">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
</pendingMessageLimitStrategy>
<!-- 10 seconds worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="10000"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry tempTopic="true" advisoryForConsumed="true"/>
<policyEntry tempQueue="true" advisoryForConsumed="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>
使用提示
建議您,如果您知道特定消費者將會很慢,則將其預先擷取大小設定為比快速消費者小的數值!
例如,如果您知道特定伺服器非常慢,並且您的訊息速率非常高而且您有一些非常快的消費者,則您可能需要啟用此功能,並將慢速伺服器的預先擷取設定為比快速伺服器略低。
監控慢速消費者的狀態
您也可以使用 JMX 主控台來檢視使用中訂閱的統計資訊。這可讓您在 TopicSubscriptionViewMBean
上檢視以下統計資訊
統計資訊 | 定義 |
---|---|
已丟棄 |
由於訂閱是慢速消費者,因此在訂閱生命週期中丟棄的訊息數量 |
已符合 |
目前已符合的訊息數量,並且一旦預先擷取緩衝區中有一些容量可用,就會立即將訊息分派給訂閱。因此,非零值表示此訂閱的預先擷取緩衝區已滿 |