訊息群組是一組具有以下特性的訊息:
-
訊息群組中的訊息共享相同的群組 ID,即它們具有相同的群組識別屬性(JMS 的
JMSXGroupID
,Apache ActiveMQ Artemis 核心 API 的_AMQ_GROUP_ID
)。 -
訊息群組中的訊息始終由同一個消費者使用,即使佇列上有許多消費者。它們會將具有相同群組 ID 的所有訊息固定到同一個消費者。如果該消費者關閉,則會選擇另一個消費者,並將接收具有相同群組 ID 的所有訊息。
當您希望特定屬性值的所有訊息都由同一個消費者依序處理時,訊息群組非常有用。
例如,可能是特定股票的訂單。您可能希望任何特定股票的訂單都由同一個消費者依序處理。為此,您可以建立一個消費者池(也許每種股票一個,但少一點也可以),然後將股票名稱設定為 _AMQ_GROUP_ID 屬性的值。
這將確保特定股票的所有訊息始終由同一個消費者處理。
由於佇列的底層 FIFO 語意,分組訊息可能會影響非分組訊息的並行處理。例如,如果佇列的頭部有一塊 100 個分組訊息,後面跟著 1,000 個非分組訊息,則所有分組訊息都需要發送到適當的用戶端(以序列方式使用這些分組訊息),然後才能使用任何非分組訊息。此情境中的功能影響是,在處理所有分組訊息時,暫時中止並行訊息處理。這可能是一個效能瓶頸,因此在確定訊息群組的大小時請記住這一點,並考慮是否應該將分組訊息與非分組訊息隔離。 |
1. 使用核心 API
用於識別訊息群組的屬性名稱為 "_AMQ_GROUP_ID"
(或常數 MessageImpl.HDR_GROUP_ID
)。或者,您可以在 SessionFactory
上將 autogroup
設定為 true,這將選擇一個隨機的唯一 ID。
2. 使用 JMS
用於識別訊息群組的屬性名稱為 JMSXGroupID
。
// send 2 messages in the same group to ensure the same
// consumer will receive both
Message message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
producer.send(message);
message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
producer.send(message);
或者,您可以在 ActiveMQConnectonFactory
上將 autogroup
設定為 true,這將選擇一個隨機的唯一 ID。這也可以在 JNDI 環境中設定,例如 jndi.properties
。以下是使用預設可在環境中取得的 "ConnectionFactory" 連線工廠的簡單範例
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.myConnectionFactory=tcp://127.0.0.1:61616?autoGroup=true
或者,您可以透過連線工廠設定群組 ID。透過此連線工廠建立的生產者傳送的所有訊息,都將在傳送的所有訊息上將 JMSXGroupID
設定為指定值。這也可以在 JNDI 環境中設定,例如 jndi.properties
。以下是使用預設可在環境中取得的 "ConnectionFactory" 連線工廠的簡單範例
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.myConnectionFactory=tcp://127.0.0.1:61616?groupID=Group-0
3. 關閉訊息群組
您通常不需要關閉訊息群組,只需繼續使用即可。但是,如果您真的想關閉群組,可以加入一個負數序號。
範例
Message message = session.createTextMessage("<foo>hey</foo>");
message.setStringProperty("JMSXGroupID", "Group-0");
message.setIntProperty("JMSXGroupSeq", -1);
...
producer.send(message);
這會關閉訊息群組,因此如果將來傳送具有相同訊息群組 ID 的另一個訊息,則會將其重新指派給新的消費者。
4. 通知消費者群組所有權變更
ActiveMQ 支援在傳送給特定訊息群組消費者的第一個訊息上放置一個布林值標頭。
若要啟用此功能,您必須設定一個標頭鍵,代理程式將使用該標頭鍵來設定旗標。
在範例中,我們使用 JMSXGroupFirstForConsumer
,但它可以是您想要的任何標頭鍵值。
透過在佇列層級將 group-first-key
設定為 JMSXGroupFirstForConsumer
,每次將新群組指派給消費者時,標頭 JMSXGroupFirstForConsumer
都會在第一個訊息上設定為 true
。
<address name="foo.bar">
<multicast>
<queue name="orders1" group-first-key="JMSXGroupFirstForConsumer"/>
</multicast>
</address>
或在使用核心 JMS 用戶端時,在自動建立時,使用消費者使用的目的地建立時的位址參數。
Queue queue = session.createQueue("my.destination.name?group-first-key=JMSXGroupFirstForConsumer");
Topic topic = session.createTopic("my.destination.name?group-first-key=JMSXGroupFirstForConsumer");
此外,可以使用 address-setting
設定來預設地址下的所有佇列的預設值
<address-setting match="my.address">
<default-group-first-key>JMSXGroupFirstForConsumer</default-group-first-key>
</address-setting>
預設情況下,此功能為關閉狀態。
5. 重新平衡訊息群組
有時,在加入新的消費者之後,您可能會發現它們沒有指派任何群組,因此沒有被使用。這是因為所有群組都已指派給現有的消費者。但是,可以重新平衡群組,以便將佇列中的所有消費者指派一個或多個群組。
在重設的確切時刻,發送到原始相關消費者的訊息可能正在傳輸中,同時將同一群組的新訊息分派到新的相關消費者。 |
5.1. 手動
透過在相關佇列上調用 resetAllGroups
來使用管理 API(例如透過 Web 主控台)。
5.2. 自動
透過在佇列層級將 group-rebalance
設定為 true
,每次加入消費者時都會觸發群組的重新平衡/重設。
如上所述,當完成群組重新平衡時,您可能會處理正在傳輸中的訊息。預設情況下,代理程式會在重新平衡發生時繼續分派。為了確保在重新平衡後將新訊息分派到不同的消費者之前處理傳輸中的訊息,您可以將 group-rebalance-pause-dispatch
設定為 true
,這會在重新平衡發生時暫停分派,直到處理完所有傳輸中的訊息。
<address name="foo.bar">
<multicast>
<queue name="orders1" group-rebalance="true" group-rebalance-pause-dispatch="true"/>
</multicast>
</address>
或在使用核心 JMS 用戶端時,在自動建立時,使用消費者使用的目的地建立時的位址參數。
Queue queue = session.createQueue("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true");
Topic topic = session.createTopic("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true");
此外,可以使用 address-setting
設定來預設地址下的所有佇列的預設值
<address-setting match="my.address">
<default-group-rebalance>true</default-group-rebalance>
<default-group-rebalance-pause-dispatch>true</default-group-rebalance-pause-dispatch>
</address-setting>
預設情況下,default-group-rebalance
為 false
,表示此功能已停用/關閉。預設情況下,default-group-rebalance-pause-dispatch
為 false
,表示此功能已停用/關閉。
6. 群組儲存桶
若要處理具有有限記憶體的佇列中的群組,以更好地擴展群組,您可以啟用群組儲存桶,本質上是將群組 ID 雜湊到儲存桶中,而不是追蹤每個群組 ID。
將 group-buckets
設定為 -1
會保留預設行為,這表示佇列會追蹤每個群組,但會遭受無邊界的記憶體使用。
將 group-buckets
設定為 0
會停用佇列上的分組(0 個儲存桶)。這在多播地址上可能很有用,其中存在許多佇列,但您可能不關心其中一個佇列的排序,而寧願保持循環配置行為。
有多種方法可以設定 group-buckets
。
<address name="foo.bar">
<multicast>
<queue name="orders1" group-buckets="1024"/>
</multicast>
</address>
透過使用核心 API 指定參數 group-buckets
為 20
,在建立佇列時指定。
或在使用 JMS 用戶端時,在自動建立時,使用消費者使用的目的地建立時的位址參數。
Queue queue = session.createQueue("my.destination.name?group-buckets=1024");
Topic topic = session.createTopic("my.destination.name?group-buckets=1024");
此外,可以使用 address-setting
設定來預設地址下的所有佇列的預設值
<address-setting match="my.bucket.address">
<default-group-buckets>1024</default-group-buckets>
</address-setting>
預設情況下,default-group-buckets
為 -1
,這是為了保持與現有預設行為的相容性。
地址 萬用字元 可用於為一組地址設定群組儲存桶。
8. 集群分組
在查看設定集群分組支援的詳細資訊之前,值得檢視整個集群分組的概念。一般而言,組合集群和訊息分組是一個糟糕的選擇,因為分組(即有序)訊息的基本概念和透過集群進行水平擴展本質上彼此矛盾。
訊息分組強制執行有序的訊息使用。有序的訊息使用要求在使用群組中的下一個訊息之前,必須完全使用並確認每個訊息。這會導致序列訊息處理(即沒有並行處理)。
但是,集群的概念是水平擴展代理程式,以便透過加入可以並行處理訊息的消費者來提高訊息輸送量。但是,由於訊息群組是有序的,因此每個群組中的訊息無法並行使用,這違背了水平擴展的目的。
由於這些原因,不建議使用集群分組。
但是,如果您在考慮這些設計注意事項的情況下評估了您的整體用例,並確定集群分組仍然可行,請繼續閱讀以瞭解所有設定詳細資訊和最佳實務。
8.1. 集群分組組態
在集群中使用訊息群組會比較複雜。這是因為具有特定群組 ID 的訊息可以到達任何節點,因此每個節點都需要知道哪些群組 ID 綁定到哪個節點上的哪個消費者。處理特定群組 ID 訊息的消費者可能位於集群的不同節點上,因此每個節點都需要知道此資訊,以便可以將訊息正確路由到具有該消費者的節點。
若要解決此問題,存在一個分組處理程式的概念。每個節點都有自己的分組處理程式,當傳送具有指派的群組 ID 的訊息時,處理程式將在它們之間決定訊息應該採取的路由。
以下是每種類型處理程式的範例組態。這應在 broker.xml
中設定。
<grouping-handler name="my-grouping-handler">
<type>LOCAL</type>
<address>jms</address>
<timeout>5000</timeout>
</grouping-handler>
<grouping-handler name="my-grouping-handler">
<type>REMOTE</type>
<address>jms</address>
<timeout>5000</timeout>
</grouping-handler>
- 類型
-
支援兩種處理程式類型 -
LOCAL
和REMOTE
。每個集群應選擇 1 個節點來具有LOCAL
分組處理程式,所有其他節點都應具有REMOTE
處理程式。實際上是LOCAL
處理程式決定應使用哪個路由,所有其他REMOTE
處理程式都會與此處理程式交談。 - 位址
-
指的是集群連線及其使用的地址。有關如何設定集群,請參閱集群章節。
- 逾時
-
等待做出決定的時間長度。如果達到此逾時,則在傳送期間將會擲回例外,這可確保保持嚴格的順序。
訊息應路由至何處的決策,最初是由接收訊息的節點所提出。該節點會根據正常的叢集路由條件選擇合適的路徑,例如:循環使用可用的佇列、優先使用本機佇列,並選擇具有消費者的佇列。如果此提案被分組處理程式接受,則該節點將從此開始將訊息路由至此佇列;如果被拒絕,則將提供替代路徑,且該節點將再次無限期地路由至該佇列。所有其他節點也將路由至提案時選擇的佇列。一旦訊息到達佇列,正常的單伺服器訊息群組語意就會接管,且訊息會被釘選到該佇列上的消費者。
您可能已注意到,單一本地處理程式存在單點故障的問題。如果此節點崩潰,則將無法做出任何決策。任何發送的訊息將無法傳遞,並會拋出例外。為避免這種情況發生,可以在另一個備份節點上複製本地處理程式。只需建立您的備份節點,並使用相同的本地處理程式進行配置即可。
8.2. 叢集分組最佳實務
使用叢集分組時,應遵循一些最佳實務
-
盡可能確保您的消費者均勻分佈在不同的節點上。這只有在您定期建立和關閉消費者的情況下才會成為問題。由於訊息在釘選後總是路由到相同的佇列,因此從此佇列中移除消費者可能會導致該佇列沒有消費者,這意味著佇列將繼續接收訊息。避免關閉消費者,或確保您總是有足夠的消費者,例如,如果您有 3 個節點,則要有 3 個消費者。
-
盡可能使用持久佇列。如果一旦群組綁定到佇列,就將佇列移除,則其他節點可能仍會嘗試將訊息路由到該佇列。可以透過確保發送訊息的工作階段刪除佇列來避免這種情況。這表示當發送下一條訊息時,訊息會發送到刪除佇列的節點,這表示可以成功進行新的提案。或者,您可以直接開始使用不同的群組 ID。
-
始終確保複製具有本地分組處理程式的節點。這表示在容錯移轉時,仍然會發生分組。
-
如果您正在使用群組逾時,則遠端節點的群組逾時應小於主協調器的值的一半。這是因為這將決定應該多久更新一次上次使用時間值,以便節點之間來回發送請求給群組。
8.3. 叢集分組範例
請參閱叢集分組範例,其中顯示如何使用 ActiveMQ Artemis 叢集配置訊息群組。