Stomp
ActiveMQ Classic 支援 Stomp 協定和 Stomp - JMS 對應。這使得使用純 Ruby、Perl、Python 或 PHP 撰寫客戶端來使用 ActiveMQ Classic 變得容易。
請參閱 Stomp 網站以取得更多詳細資訊
規格相容性
ActiveMQ Classic v5.6 實作 Stomp v1.1 規格,除了允許訊息標頭鍵的開頭或結尾有空格外,這些空格會保留在標頭值中。在未來的版本中,情況將不再如此,應更新客戶端並檢查使用者程式碼,以確保標頭中的空格是有意存在的,而不是意外或客戶端「功能」。
為 Stomp 啟用 ActiveMQ Classic Broker
若要在 Broker 中啟用 STOMP 協定支援,請新增 URI 結構為 stomp
的傳輸連接器定義。
範例
<transportConnectors>
<transportConnector name="stomp" uri="stomp://127.0.0.1:61613"/>
</transportConnectors>
若要查看完整範例,請嘗試 此 XML。如果您將該 XML 儲存為 foo.xml
,則可以透過命令列執行 stomp,如下所示
activemq xbean:foo.xml
如需更多協助,請參閱 執行 Broker。
Stomp Wire Format
Stomp 使用基於文字的 Wire Format,可以使用以下選項進行設定。 所有選項都可以在 Broker 的傳輸繫結 URI 上設定。
參數名稱 | 預設值 | 描述 |
---|---|---|
maxDataLength |
104857600 |
可以傳送的訊息主體(內容)的最大大小。 |
maxFrameSize |
MAX_LONG |
從 ActiveMQ Classic 5.12.0 開始:可以傳送的最大框架大小。Stomp 框架包含命令、選用標頭和選用主體。可以協助防止 OOM DOS 攻擊 |
範例
<transportConnector name="stomp+ssl" uri="stomp+ssl://127.0.0.1:61612?wireFormat.maxFrameSize=1000000"/>
使用正確的前置詞!
Wire Format 選項必須具有前置詞
wireFormat.
才能生效,例如wireFormat.
maxDataLength=100000
。缺少此前置詞的選項將會被忽略。
安全性
從 ActiveMQ Classic 5.1 開始:Stomp 完全支援 ActiveMQ Classic 的安全性機制。這表示 CONNECT
命令在驗證失敗時會傳回 ERROR
STOMP 框架。此外,當您嘗試存取(讀取/寫入)特定目的地時,將會套用授權原則。如果您使用同步操作(透過使用 收據),則在未經授權的存取嘗試時,您可能會收到 ERROR
框架。在其他情況下,操作將會被捨棄,但不會通知客戶端錯誤。這適用於可能在 Broker 端發生的所有錯誤。
SSL
為了提高安全性,您可以按照以下章節的說明,透過 SSL 使用 Stomp。
透過 NIO 啟用 Stomp
從 ActiveMQ Classic 5.3 開始:為了提高可擴展性和效能,Stomp 協定可以設定為在 NIO 傳輸上執行。相較於對應的 TCP 連接器,NIO 傳輸將會使用更少執行緒。當需要支援大量佇列時,這會有所幫助。若要使用 NIO,請將傳輸連接器的 URI 結構變更為 stomp+nio
。
範例
<transportConnector name="stomp+nio" uri="stomp+nio://127.0.0.1:61612"/>
透過 SSL 啟用 Stomp
若要設定 ActiveMQ Classic 以透過 SSL 連線使用 Stomp,請將 URI 結構變更為 stomp+ssl
。
範例
<transportConnector name="stomp+ssl" uri="stomp+ssl://127.0.0.1:61612"/>
如需在 ActiveMQ Classic 中使用 SSL 的更多詳細資訊,請參閱以下文章(如何使用 SSL)。在客戶端使用 Stomp over SSL 的範例可以在 PHP Stomp 客戶端範例中找到。
心跳寬限期
STOMP 協定(1.1 或更新版本)定義了心跳的概念,作為客戶端和 Broker 可以判斷彼此之間基礎 TCP 連線的健全狀況的方法。ActiveMQ Classic 支援 STOMP 心跳,前提是客戶端使用該協定的 1.1 版(或更新版本)。
在 ActiveMQ Classic 5.9.0 之前:嚴格執行「讀取」心跳逾時(也就是從客戶端傳送到 Broker 的心跳)。換句話說,Broker 無法容忍來自客戶端的讀取心跳延遲到達。這會導致 Broker 斷定客戶端已不存在,並在客戶端未能遵守其設定的心跳設定時,關閉其客戶端連線端。
從 ActiveMQ Classic 5.9.0 開始:現在可以透過新的傳輸選項 transport.hbGracePeriodMultiplier
設定讀取心跳的逾時強制執行。
<transportConnectors>
<transportConnector name="stomp" uri="stomp://127.0.0.1:61613?transport.hbGracePeriodMultiplier=1.5"/>
</transportConnectors>
此乘數是用於計算 Broker 將對每個客戶端連線強制執行的有效讀取心跳逾時。乘數會套用至客戶端在其 CONNECT
框架中指定的讀取逾時間隔
<client specified read heart-beat interval> * <grace periodmultiplier> == <broker enforced read heart-beat timeout interval>
為了向後相容性,如果未設定寬限期乘數,則預設強制執行模式仍為嚴格,例如 transport.hbGracePeriodMultiplier=1.0
。嘗試將寬限期乘數設定為小於或等於 1.0
的值將會被自動忽略。
希望能夠容忍來自 Broker 的延遲心跳的 STOMP 客戶端,必須自行實作解決方案。
-
請查看 STOMP 規格以了解心跳的詳細資訊
-
實作此功能的 JIRA:ActiveMQ Classic 5.x 不支援 STOMP 協定支援的心跳寬限期概念
使用 Stomp 處理目的地
請注意,在將字串作為 JMS 目的地傳遞到 ActiveMQ Classic 之前,會從字串中移除 stomp 中的前置詞 /queue/
或 /topic/
。另請注意,MOM 系統中的預設分隔符號是 .
(點)。雖然 FOO.BAR
是識別佇列類型目的地的正常語法,但 Stomp 對應的語法是 /queue/FOO.BAR
請小心以
/
開始目的地如果在 Stomp 世界中使用
/queue/foo/bar
,則在 JMS 世界中,佇列會稱為foo/bar
,而不是/foo/bar
。
STOMP 中的持續訊息
STOMP 訊息預設為非持續性。若要使用持續訊息,請將以下 STOMP 標頭新增至所有
SEND
請求:persistent:true
。此預設值與 JMS 訊息的預設值相反。
使用 JMS 文字/位元組訊息和 Stomp
Stomp 是一個非常簡單的協定 - 這也是它的優點之一!因此,它不了解 JMS 訊息,例如 TextMessage
或 BytesMessage
。但是,該協定確實支援 content-length
標頭。為了在 STOMP 和 JMS 客戶端之間提供更強大的互動,ActiveMQ Classic 會根據是否包含此標頭來判斷從 Stomp 傳送到 JMS 時要建立的訊息類型。邏輯很簡單
包含 content-length 標頭 | 產生的訊息 |
---|---|
是 | BytesMessage |
否 | TextMessage |
從 JMS 傳送到 Stomp 時,也可以遵循相同的邏輯。可以撰寫 Stomp 客戶端來根據是否包含 content-length
標頭來判斷要為使用者提供的訊息結構類型。
訊息轉換
可以使用 SEND
和 SUBSCRIBE
訊息上的 transformation
訊息標頭,指示 ActiveMQ Classic 將訊息從文字轉換為所需的格式。目前,ActiveMQ Classic 隨附一個轉換器,可以將 XML/JSON 文字轉換為 Java 物件,但您也可以新增自己的轉換器。
以下是如何使用內建轉換器的快速範例(取自測試案例)
private String xmlObject = "<pojo>\n"
+ " <name>Dejan</name>\n"
+ " <city>Belgrade</city>\n"
+ "</pojo>";
public void testTransformationReceiveXMLObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-object-xml\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
相依性
ActiveMQ Classic 使用 XStream 來滿足其轉換需求。由於它是選用的相依性,因此您必須將適當的 JAR 放入
lib/
資料夾,將其新增至 Broker 的類別路徑。此外,如果您計劃使用 JSON 轉換,則必須將 Jettison JSON 解析器新增至類別路徑。
若要建立自己的轉換器,您必須執行以下操作
-
透過實作 FrameTranslator 介面來建置您的轉換器
-
將其與適當的標頭值關聯,方法是在 JAR 的
META-INF/services/org/apache/activemq/transport/frametranslator/
資料夾中建立一個名為您想要使用的值的文件,其中將包含值class=_您的轉換器的完整類別名稱_
例如,內建轉換器包含以下值
class=org.apache.activemq.transport.stomp.XStreamFrameTranslator
在 META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
檔案中。
偵錯
如果您想偵錯 Broker 和客戶端之間的 Stomp 通訊,您應該使用 trace
參數來設定 Stomp 連接器,如下所示
<transportConnectors>
<transportConnector name="stomp" uri="stomp://127.0.0.1:61613?trace=true"/>
</transportConnectors>
這將指示 Broker 追蹤其傳送和接收的所有封包。
此外,您必須為適當的記錄啟用追蹤。您可以透過將以下內容新增至您的 conf/log4j.properties
來實現此目的
log4j.logger.org.apache.activemq.transport.stomp=TRACE
最後,您可能希望將這些訊息保留在個別檔案中,而不是污染標準 Broker 的記錄。您可以使用以下 log4j 設定來實現此目的
log4j.appender.stomp=org.apache.log4j.RollingFileAppender
log4j.appender.stomp.file=${activemq.base}/data/stomp.log
log4j.appender.stomp.maxFileSize=1024KB
log4j.appender.stomp.maxBackupIndex=5
log4j.appender.stomp.append=true
log4j.appender.stomp.layout=org.apache.log4j.PatternLayout
log4j.appender.stomp.layout.ConversionPattern=%d \[%-15.15t\] %-5p %-30.30c{1} - %m%n
log4j.logger.org.apache.activemq.transport.stomp=TRACE, stomp
log4j.additivity.org.apache.activemq.transport.stomp=false
# Enable these two lines and disable the above two if you want the frame IO ONLY (e.g., no heart beat messages, inactivity monitor etc).
#log4j.logger.org.apache.activemq.transport.stomp.StompIO=TRACE, stomp
#log4j.additivity.org.apache.activemq.transport.stomp.StompIO=false
完成此操作後,您的所有 Stomp 封包都會記錄到 data/stomp.log
Java API
從 ActiveMQ Classic 5.2 開始:ActiveMQ Classic 隨附簡單的 Java Stomp API。請注意,提供此 API 純粹是為了測試目的,您應該始終考慮從 Java 使用標準 JMS API,而不是這個 API。以下程式碼片段提供了使用此 API 的簡單範例
StompConnection connection = new StompConnection();
connection.open("localhost", 61613);
connection.connect("system", "manager");
StompFrame connect = connection.receive();
if(!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
throw new Exception ("Not connected");
}
connection.begin("tx1");
connection.send("/queue/test", "message1", "tx1", null);
connection.send("/queue/test", "message2", "tx1", null);
connection.commit("tx1");
connection.subscribe("/queue/test", Subscribe.AckModeValues.CLIENT);
connection.begin("tx2");
StompFrame message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
connection.commit("tx2");
connection.disconnect();
此範例是標準 ActiveMQ Classic 發行版的一部分。您可以使用以下命令從 ./example
資料夾執行它
ant stomp
適用於 JMS 訊息語意的 Stomp 擴充功能
請注意,STOMP 的設計宗旨是盡可能簡單,因此任何腳本語言/平台都可以用最少的力氣與其他平台交換訊息。STOMP 允許在每個請求上插入標頭,例如傳送和接收訊息。ActiveMQ Classic 對 Stomp 協定進行了多項擴充,以便 Stomp 客戶端能夠支援 JMS 語義。OpenWire JMS 生產者可以傳送訊息給 Stomp 消費者,而 Stomp 生產者可以傳送訊息給 OpenWire JMS 消費者。而 Stomp 對 Stomp 的配置可以使用更豐富的 JMS 訊息控制功能。
STOMP 在 SENT
訊息上支援以下標準 JMS 屬性
STOMP 標頭 | JMS 標頭 | 描述 |
---|---|---|
correlation-id |
JMSCorrelationID |
良好的消費者會將此標頭添加到他們發送的任何回應中。 |
expires |
JMSExpiration |
訊息的到期時間。 |
JMSXGroupID |
JMSXGroupID |
指定 訊息群組。 |
JMSXGroupSeq |
JMSXGroupSeq |
可選標頭,指定 訊息群組 中的序列號。 |
persistent |
JMSDeliveryMode |
訊息是否持久。 |
priority |
JMSPriority |
訊息的優先順序。 |
reply-to |
JMSReplyTo |
您應將回覆傳送到此目的地。 |
type |
JMSType |
訊息類型。 |
ActiveMQ Classic 對 STOMP 的擴充
您可以在 STOMP 命令中添加自訂標頭來配置 ActiveMQ Classic 協定。以下是一些範例
動詞 | 標頭 | 類型 | 描述 |
---|---|---|---|
CONNECT |
client-id |
字串 |
指定 JMS clientID,它與 activemq.subcriptionName 結合使用以表示一個持久訂閱者。 |
SUBSCRIBE |
activemq.dispatchAsync |
布林值 |
對於 Broker 中非持久性的主題,訊息應該從生產者線程同步還是非同步地分發?對於快速消費者,將此值設定為 false 。對於慢速消費者,將其設定為 true ,這樣分發就不會阻塞快速消費者。 |
SUBSCRIBE |
activemq.exclusive |
布林值 |
我希望成為佇列上的 獨佔消費者。 |
SUBSCRIBE |
activemq.maximumPendingMessageLimit |
整數 |
為了在非持久性主題上進行 慢速消費者處理,透過丟棄舊訊息 - 我們可以設定一個最大待處理限制,一旦慢速消費者累積到這個高水位標記,我們就開始丟棄舊訊息。 |
SUBSCRIBE |
activemq.noLocal |
布林值 |
指定是否應忽略本地傳送的訊息進行訂閱。設定為 true 以過濾掉本地傳送的訊息。 |
SUBSCRIBE |
activemq.prefetchSize |
整數 |
指定將分發給客戶端的最大待處理訊息數。一旦達到此最大值,除非客戶端確認一條訊息,否則不再分發訊息。當處理訊息可能較慢時,將其設定為較低的值 > 1,以便在多個消費者之間公平分配訊息。注意:如果您的 STOMP 客戶端是使用動態腳本語言(例如 Ruby)實作的,則此參數必須設定為 1 ,因為沒有客戶端訊息大小的概念可以調整大小。STOMP 不支援值為 0 。 |
SUBSCRIBE |
activemq.priority |
位元組 |
設定消費者的優先順序,以便可以按優先順序加權分發。 |
SUBSCRIBE |
activemq.retroactive |
布林值 |
對於非持久性主題,使此訂閱成為 追溯性 訂閱。 |
SUBSCRIBE |
activemq.subscriptionName |
字串 |
對於持久性主題訂閱,您必須在 v5.7.0 之前,在連線上指定相同的 activemq.client-id ,並在訂閱上指定相同的 activemq.subcriptionName 。注意:拼寫為 subcriptionName 而不是 subscriptionName 。這並不直觀,但這是 ActiveMQ Classic 4.x 中的實作方式。對於 ActiveMQ Classic 的 5.0 版本,subcriptionName 和 subscriptionName 都將被支援 (subcriptionName 已在 v5.6.0 中移除)。 |
SUBSCRIBE |
selector |
字串 |
使用 JMS 1.1 規格中指定的 SQL 92 語法來指定 JMS 選擇器。這允許在訂閱過程中將篩選器應用於每條訊息。 |