雖然 Apache ActiveMQ Artemis 提供了與 JMS 無關的訊息傳遞 API,但許多使用者會更習慣使用 JMS。
JMS 是一個非常受歡迎的訊息傳遞 API 標準,大多數訊息系統都提供 JMS API。如果您完全不熟悉 JMS,我們建議您參考 Oracle JMS 教學 - 完整的 JMS 教學超出本指南的範圍。
Apache ActiveMQ Artemis 也附帶了許多範例,其中許多範例示範了 JMS API 的用法。一個好的起點是試用簡單的 JMS 佇列和主題範例,但我們也為 JMS API 的許多其他部分提供了範例。範例的完整描述可在範例中找到。
在本節中,我們將逐步介紹配置伺服器以使用 JMS 並建立簡單 JMS 程式的主要步驟。我們還將展示如何配置和使用 JNDI,以及如何在不使用任何 JNDI 的情況下將 JMS 與 Apache ActiveMQ Artemis 一起使用。
1. 簡單的訂購系統
在本章中,我們將使用一個非常簡單的訂購系統作為範例。這是一個有些牽強的範例,因為它非常簡單,但它可以示範設定和使用 JMS 的最基本概念。
我們將有一個名為 OrderQueue
的單一 JMS 佇列,我們將有一個 MessageProducer
將訂單訊息傳送到佇列,以及一個 MessageConsumer
從佇列中接收訂單訊息。
該佇列將是一個 durable
佇列,即它將在伺服器重新啟動或當機後仍然存在。我們還希望預先部署佇列,即在伺服器配置中指定佇列,以便自動建立它,而無需我們從用戶端明確建立它。
2. JNDI
JMS 規範建立了一個慣例,即管理物件(即 JMS 佇列、主題和連線工廠實例)透過 JNDI API 提供。經紀人可以自由地以他們認為合適的方式實作 JNDI,假設實作符合 API。Apache ActiveMQ Artemis 沒有 JNDI 伺服器。相反,它使用用戶端 JNDI 實作,該實作依賴於在環境中設定的特殊屬性來建構適當的 JMS 物件。換句話說,沒有物件儲存在 Apache ActiveMQ Artemis 伺服器上的 JNDI 中,而是根據提供的組態在用戶端上簡單地實例化它們。讓我們看看不同類型的管理物件以及如何配置它們。
當 Apache ActiveMQ Artemis 以獨立模式執行時,以下配置屬性是絕對必要的。當 Apache ActiveMQ Artemis 集成到應用程式伺服器(例如 Wildfly)時,應用程式伺服器本身幾乎肯定會提供具有自己屬性的 JNDI 用戶端。 |
2.1. ConnectionFactory JNDI
用戶端使用 JMS 連線工廠來建立與伺服器的連線。它知道它正在連線的伺服器的位置,以及許多其他配置參數。
以下是一個簡單的 JNDI 環境範例,供用戶端查找連線工廠以存取 Apache ActiveMQ Artemis 的嵌入式實例
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.invmConnectionFactory=vm://0
在此實例中,我們建立了一個繫結到 invmConnectionFactory
的連線工廠,任何帶有字首 connectionFactory.
的項目都會建立一個連線工廠。
在某些情況下,可能有多個伺服器實例在特定的 JVM 中執行。在這種情況下,每個伺服器通常都有一個具有唯一伺服器 ID 的 InVM 接收器。使用 JMS 和 JNDI 的用戶端可以透過為每個伺服器指定一個連線工廠來解決這個問題,如下所示
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.invmConnectionFactory0=vm://0
connectionFactory.invmConnectionFactory1=vm://1
connectionFactory.invmConnectionFactory2=vm://2
以下是所有支援的 URL 方案的列表
-
vm
-
tcp
-
udp
-
jgroups
大多數用戶端不會連線到嵌入式代理程式。用戶端最常跨網路連線到遠端代理程式。以下是一個簡單範例,說明用戶端如何設定連線工廠以連線到在 myhost:5445 上執行的遠端代理程式
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.ConnectionFactory=tcp://myhost:5445
在上面的範例中,用戶端使用 tcp
方案作為提供者 URL。用戶端也可以在 URL 中指定多個逗號分隔的主機:埠組合(例如 (tcp://remote-host1:5445,remote-host2:5445)
)。無論 URL 中有一個還是多個主機:埠組合,它們都被視為基礎連線的初始連線器。
也支援 udp
方案,它應使用與 ActiveMQ Artemis 伺服器上配置的相應 broadcast-group
中的 group-address
和 group-port
相符的主機:埠組合。
每個方案都有一組特定的屬性,可以使用傳統的 URL 查詢字串格式(例如 scheme://host:port?key1=value1&key2=value2
)來設定這些屬性,以自訂基礎傳輸機制。例如,如果用戶端想要使用 TCP 和 SSL 連線到遠端伺服器,它將建立一個連線工廠,如下所示:tcp://remote-host:5445?ssl-enabled=true
。
tcp
方案的所有可用屬性都在有關 Netty 傳輸的文檔中描述。
請注意,如果您使用的是 tcp
方案和多個地址,則可以將查詢應用於所有 URL 或僅應用於個別連線器,因此當您有
-
(tcp://remote-host1:5445?httpEnabled=true,remote-host2:5445?httpEnabled=true)?clientID=1234
那麼 httpEnabled
屬性僅在個別連線器上設定,而 clientId
設定在實際的連線工廠上。在整個 URI 上設定的任何特定於連線器的屬性都將應用於所有連線器。
udp
方案支援 4 個屬性
- localAddress
-
如果您在同一台機器上執行多個網路介面,則可能需要指定探索群組僅在特定介面上偵聽。為此,您可以使用此參數指定介面位址。
- localPort
-
如果您想要指定資料報套接字繫結到的本機埠,可以在這裡指定它。通常,您只需使用 -1 的預設值,表示應使用匿名埠。此參數始終與
localAddress
一起指定。 - refreshTimeout
-
這是探索群組在收到來自特定伺服器的最後一次廣播後,從其列表中移除該伺服器的連線器對項目之前等待的時間段。您通常會將此值設定為比廣播群組上的 broadcast-period 高得多的值,否則由於時間上的細微差異,伺服器可能會間歇性地從列表中消失,即使它們仍在廣播。此參數是可選的,預設值為 10000 毫秒(10 秒)。
- discoveryInitialWaitTimeout
-
如果連線工廠在建立後立即使用,則它可能沒有足夠的時間從叢集中的所有節點接收廣播。在首次使用時,連線工廠將確保在建立第一個連線之前等待此長時間。此參數的預設值為 10000 毫秒。
最後,支援 jgroups
方案,它為伺服器探索提供了 udp
方案的替代方案。URL 模式是 jgroups://channelName?file=jgroups-xml-conf-filename
,其中 jgroups-xml-conf-filename
指的是類別路徑上包含 JGroups 配置的 XML 檔案。channelName
是給予建立的 jgroups 通道的名稱。
refreshTimeout
和 discoveryInitialWaitTimeout
屬性與 udp
一樣受到支援。
預設連線工廠的預設類型是 javax.jms.ConnectionFactory
或 jakarta.jms.ConnectionFactory
,具體取決於您使用的用戶端。可以透過設定類型來變更此設定,如下所示
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://127.0.0.1:5445?type=CF
在此範例中,它仍然設定為預設值,下面顯示了可以設定的類型清單。
2.1.1. 連線工廠類型配置
提供的介面將取決於您是使用 JMS 還是 Jakarta Messaging 用戶端實作。
type | interface |
---|---|
CF(預設) |
|
XA_CF |
|
QUEUE_CF |
|
QUEUE_XA_CF |
|
TOPIC_CF |
|
TOPIC_XA_CF |
|
2.2. Destination JNDI
JMS 目的地通常也透過 JNDI 查找。與連線工廠一樣,可以使用 JNDI 環境中的特殊屬性來配置目的地。屬性名稱應遵循以下模式:queue.<jndi-binding>
或 topic.<jndi-binding>
。屬性值應為 Apache ActiveMQ Artemis 伺服器託管的佇列名稱。例如,如果伺服器具有如下配置的 JMS 佇列
<address name="OrderQueue">
<queue name="OrderQueue"/>
</address>
如果用戶端想要將此佇列繫結到 "queues/OrderQueue",則 JNDI 屬性將配置如下
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://myhost:5445
queue.queues/OrderQueue=OrderQueue
也可以查找尚未在 JNDI 環境中明確配置的 JMS 目的地。可以使用查找字串中的 dynamicQueues/
或 dynamicTopics/
來實現此目的。例如,如果用戶端想要查找上述 "OrderQueue",只需使用字串 "dynamicQueues/OrderQueue" 即可。請注意,dynamicQueues/
或 dynamicTopics/
後面的文字必須與伺服器上的目的地名稱完全對應。
2.3. 程式碼
以下是範例的程式碼
首先,我們將建立一個 JNDI 初始環境,從中查找我們的 JMS 物件。如果上述屬性在 jndi.properties
中設定,並且它在類別路徑上,則任何新的、空的 InitialContext
都將使用這些屬性初始化
InitialContext ic = new InitialContext();
//Now we'll look up the connection factory from which we can create
//connections to myhost:5445:
ConnectionFactory cf = (ConnectionFactory)ic.lookup("ConnectionFactory");
//And look up the Queue:
Queue orderQueue = (Queue)ic.lookup("queues/OrderQueue");
//Next we create a JMS connection using the connection factory:
Connection connection = cf.createConnection();
//And we create a non transacted JMS Session, with AUTO\_ACKNOWLe.g. //acknowledge mode:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//We create a MessageProducer that will send orders to the queue:
MessageProducer producer = session.createProducer(orderQueue);
//And we create a MessageConsumer which will consume orders from the
//queue:
MessageConsumer consumer = session.createConsumer(orderQueue);
//We make sure we start the connection, or delivery won't occur on it:
connection.start();
//We create a simple TextMessage and send it:
TextMessage message = session.createTextMessage("This is an order");
producer.send(message);
//And we consume the message:
TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got order: " + receivedMessage.getText());
就這麼簡單。如需各種可用的 JMS 範例,請參閱範例。
警告
請注意,JMS 連線、會期、生產者和消費者皆設計為可重複使用。
為每個您生產或消費的訊息建立新的連線、會期、生產者和消費者是一種反模式。 如果您這樣做,您的應用程式效能將會非常差。 這將在效能調整章節 效能調整 中進一步討論。
3. 不使用 JNDI 直接實例化 JMS 資源
雖然從 JNDI 查詢 JMS 管理物件(即 JMS 佇列、主題和 ConnectionFactory 實例)是一種非常常見的 JMS 使用模式,但在某些情況下,您可能會想「我為什麼需要 JNDI? 我為什麼不能直接實例化這些物件?」
使用 Apache ActiveMQ Artemis,您可以完全做到這一點。 Apache ActiveMQ Artemis 支援直接實例化 JMS 佇列、主題和 ConnectionFactory 實例,因此您完全不必使用 JNDI。
有關直接實例化的完整範例,請查看範例的 JMS 章節下的 直接實例化 JMS 物件 範例。
這是我們的簡單範例,已重寫為完全不使用 JNDI
我們透過 ActiveMQJMSClient Utility 類別建立 JMS ConnectionFactory 物件,請注意,我們需要提供連線參數並指定我們正在使用的傳輸方式,有關連接器的更多資訊,請參閱 設定傳輸。
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName());
ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,transportConfiguration);
//We also create the JMS Queue object via the ActiveMQJMSClient Utility
//class:
Queue orderQueue = ActiveMQJMSClient.createQueue("OrderQueue");
//Next we create a JMS connection using the connection factory:
Connection connection = cf.createConnection();
//And we create a non transacted JMS Session, with AUTO\_ACKNOWLe.g. //acknowledge mode:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//We create a MessageProducer that will send orders to the queue:
MessageProducer producer = session.createProducer(orderQueue);
//And we create a MessageConsumer which will consume orders from the
//queue:
MessageConsumer consumer = session.createConsumer(orderQueue);
//We make sure we start the connection, or delivery won't occur on it:
connection.start();
//We create a simple TextMessage and send it:
TextMessage message = session.createTextMessage("This is an order");
producer.send(message);
//And we consume the message:
TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got order: " + receivedMessage.getText());
4. 設定客戶端 ID
這代表 JMS 客戶端的客戶端 ID,且建立持久訂閱時需要此 ID。 您可以在連線工廠上設定此 ID,並透過 clientId
元素進行設定。 由此連線工廠建立的任何連線都將以此設定為其客戶端 ID。
5. 設定 DUPS_OK 的批次大小
當 JMS 確認模式設定為 DUPS_OK
時,可以設定消費者以批次方式而非一次一個的方式傳送確認,進而節省寶貴的頻寬。 這可以透過連線工廠透過 dupsOkBatchSize
元素進行設定,並以位元組為單位設定。 預設值為 1024 * 1024 位元組 = 1 MiB。
6. 設定交易批次大小
當在交易中接收訊息時,可以設定消費者以批次方式而非個別方式傳送確認,進而節省寶貴的頻寬。 這可以在連線工廠上透過 transactionBatchSize
元素進行設定,並以位元組為單位設定。 預設值為 1024 * 1024。
7. 設定目的地快取
許多框架(例如 Spring)會在每次操作時按名稱解析目的地,這可能會導致效能問題以及對代理程式的額外呼叫,在目的地(位址)是永久性代理程式端的情況下,例如它們由平台或操作團隊管理。 使用 cacheDestinations
元素,您可以切換目的地快取以提高效能並減少對代理程式的呼叫。 如果目的地(位址)不是永久性代理程式端(例如動態建立/刪除),則不應使用此功能。