訊息群組
訊息群組
訊息群組是對 獨佔消費者 功能的增強。它們提供:
- 保證單一佇列中相關訊息處理的順序。
- 在多個消費者之間進行訊息處理的負載平衡。
- 如果 JVM 關閉,可高可用性/自動故障轉移到其他消費者。
因此,邏輯上訊息群組就像是並行的 獨佔消費者。訊息不是全部傳送到單一消費者,而是使用標準 JMS 標頭 JMSXGroupID
來定義訊息屬於哪個訊息群組。然後,訊息群組功能會確保相同訊息群組的所有訊息都會被傳送到相同的 JMS 消費者 - 只要該消費者保持運作。一旦消費者停止運作,就會選擇另一個消費者。
另一種解釋訊息群組的方式是,它提供了訊息在消費者之間的黏性負載平衡;其中 JMSXGroupID
有點像是 HTTP 會期 ID 或 Cookie 值,而訊息代理程式則扮演著 HTTP 負載平衡器的角色。
範例使用案例
假設我們正在開發某種訂單撮合系統,人們可以在其中買賣東西(股票、股份、線上投注等)。您希望有消費者可以撮合不同項目(股票/投注)的買入和賣出,以便為了效能而在 RAM 中保留資料集的子集。因此,將 JMSXGroupID
設定為 MSFT
、IBM
、 SUNW
等,以使用股票代碼來定義訊息群組。(它可以是任何字串;或許可以結合交易簿、交易所以及日期等等 - 群組 ID 越具體,您就能以更高的並行度執行)。假設我們正在買賣 MSFT
、IBM
、 SUNW
股票;訊息群組功能保證所有 MSFT
訊息都將由同一個消費者依序處理;IBM
和 SUNW
也是如此。
訊息群組的運作方式
當訊息被分派給消費者時,會檢查 JMSXGroupID
。如果存在,則代理程式會檢查是否有消費者擁有該訊息群組。由於可能存在大量的訊息群組,因此使用雜湊儲存桶,而不是實際的 JMSXGroupID
字串。
如果沒有消費者與訊息群組關聯,則會選擇一個消費者。該 JMS MessageConsumer
將接收所有具有相同 JMSXGroupID
值的後續訊息,直到:
- 消費者關閉(或建立消費者的客戶端關閉等)。
- 有人透過傳送一個具有負值的
JMSXGroupSeq
的訊息來關閉訊息群組(請參閱下文以取得更多詳細資訊)。
注意:如同訊息選擇器比對一樣,基於 JMSXGroupID
的分組會在分派記憶體中的訊息之前發生。使用預設的 maxPageSize
選項,如果並非所有訊息都適合放在記憶體中,則大量以一個群組為目標的訊息積壓可能會阻止接收其他群組的訊息。您可以如下所示變更目的地的預設 maxPageSize
設定:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" maxPageSize="1000"/>
</policyEntries>
</policyMap>
</destinationPolicy>
使用訊息群組
您只需要變更 JMS 生產者,以使用您選擇的某個 String
值填寫 JMSXGroupID
訊息標頭。
範例
Mesasge message = session.createTextMessage("<foo>hey</foo>");
message.setStringProperty("JMSXGroupID", "IBM\_NASDAQ\_20/4/05");
...
producer.send(message);
關閉訊息群組
您通常不需要關閉訊息群組;繼續使用即可。但是,如果您真的想關閉群組,可以新增一個負的序列號。
範例
Mesasge message = session.createTextMessage("<foo>hey</foo>");
message.setStringProperty("JMSXGroupID", "IBM\_NASDAQ\_20/4/05");
message.setIntProperty("JMSXGroupSeq", -1);
...
producer.send(message);
這樣就會關閉訊息群組,因此如果未來傳送另一個具有相同訊息群組 ID 的訊息,則會將其重新指派給新的消費者。
含意
訊息群組表示您可獲得在消費者叢集之間進行訊息網格處理的能力,同時具備可靠性、自動故障轉移、負載平衡,而且還可以對訊息的處理進行排序。因此,它是兩全其美的方法。但是,使用上述範例,訊息群組實際上所做的是使用使用者可定義的分割策略(即 JMSXGroupID
值)將您的工作負載分割給不同的消費者。
這種方式的巧妙之處在於,您可以做一些很棒的事情,例如使用大量的 RAM 快取;在 MSFT
消費者中的 RAM 中保留 MSFT
的順序;在 IBM
消費者中的 RAM 中保留 IBM
的訂單 - 由於訊息代理程式正在為您進行分割,因此您不必依賴具有快取間同步和鎖定的分散式快取來利用快取功能。
最棒的是,對於應用程式開發人員來說,它看起來像一個簡單的單一消費者世界,您可以在其中處理訊息並執行您的工作;讓代理程式為您完成所有繁瑣的工作
- 分割流量
- 跨消費者對訊息群組進行負載平衡
- 隨著消費者的來來去去,自動將群組故障轉移到不同的消費者
總而言之;如果排序或每個訊息快取和同步對您而言在任何方面都很重要,那麼我們強烈建議您使用訊息群組來分割您的流量。
取得訊息群組擁有權變更的通知
ActiveMQ Classic 支援一個名為 JMSXGroupFirstForConsumer
的布林標頭。此標頭會設定在針對特定訊息群組傳送到消費者的第一個訊息上。
如果 JMS 連線使用 failover:
,而且發生暫時性網路錯誤,導致連線與代理程式中斷,然後在稍後重新連線,則會在 JMS 客戶端的底層建立新的消費者執行個體,這會導致可能會為同一個訊息群組設定另一個具有此標頭的訊息。
範例
String groupId = message.getStringProperty("JMSXGroupId");
if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
// flush cache for groupId
}
清除快取以確保在面對網路錯誤時的一致狀態。
新增消費者
如果您在代理程式中已經有現有訊息,然後在稍後階段新增消費者,最好延遲訊息分派的開始,直到所有消費者都存在(或至少給予他們足夠的時間訂閱)。如果您不這樣做,第一個消費者可能會取得所有訊息群組,而且所有訊息都會被分派給它。您可以透過使用 consumersBeforeDispatchStarts
和 timeBeforeDispatchStarts
目的地原則 來達成此目的。
當 consumersBeforeDispatchStarts 和 timeBeforeDispatchStarts 都設定為大於零的值時,一旦存在所需的消費者數量或 timeBeforeDispatchStarts 超時到期,就會開始分派。如果僅設定 consumersBeforeDispatchStarts,則消費者連線的逾時時間為 1 秒。如果所有消費者都斷線,則訊息分派延遲會在下一個消費者連線時再次套用。
以下是延遲分派 200ms
的目的地原則範例
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" timeBeforeDispatchStarts="200"/>
</policyEntries>
</policyMap>
</destinationPolicy>
以下程式碼片段示範如何在分派開始之前等待兩個消費者(或兩秒)
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/>
</policyEntries>
</policyMap>
</destinationPolicy>
如同 適當的測試案例 所示,在分派之前加入一個短暫的暫停時間或設定最少的消費者數量,可確保訊息群組的平均分佈。
如果您因為任何原因而需要手動重新平衡訊息群組,可以透過在對應佇列的 JMX MBean 上執行 removeAllMessageGroups
作業來達成此目的。
記憶體耗用、負載平衡、複雜度等相互競爭的需求。
名為 CachedMessageGroupMap
的預設行為會限制在 LRU 快取中使用 1024 個訊息群組,可能無法符合您對訊息順序的期望。CachedMessageGroupMap
具有有限的記憶體使用量,但僅追蹤最多 1024 個(或配置的最大大小)群組,然後遺失追蹤任何比最新 1024 個群組還舊的群組。如此一來,如果群組多於最大值,則最舊群組的順序將會遺失。
通常,使用者會關閉群組,以便在記憶體中保留的集合可以保持在配置的限制之下。在 AMQ-6851 中提供了一些有用的討論
為了防止此限制,您可以使用 MessageGroupHashBucket
或 SimpleMessageGroupMap
。它們透過將每個群組與消費者關聯來運作。
SimpleMessageGroupMap
會追蹤每個群組,但會遇到無限制的記憶體使用問題。
以下程式碼片段示範如何啟用它
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<messageGroupMapFactory>
<simpleMessageGroupMapFactory/>
</messageGroupMapFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
MessageGroupHashBucked
會追蹤每個群組,並具有有限的記憶體使用量。以下程式碼片段示範如何啟用它
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<messageGroupMapFactory>
<messageGroupHashBucked cachedSize=1024 />
</messageGroupMapFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>