Stomp

連線能力 > 協定 > Stomp

ActiveMQ Classic 支援 Stomp 協定和 Stomp - JMS 對應。這使得使用純 RubyPerlPythonPHP 撰寫客戶端來使用 ActiveMQ Classic 變得容易。

請參閱 Stomp 網站以取得更多詳細資訊

規格相容性

ActiveMQ Classic v5.6 實作 Stomp v1.1 規格,除了允許訊息標頭鍵的開頭或結尾有空格外,這些空格會保留在標頭值中。在未來的版本中,情況將不再如此,應更新客戶端並檢查使用者程式碼,以確保標頭中的空格是有意存在的,而不是意外或客戶端「功能」。

為 Stomp 啟用 ActiveMQ Classic Broker

若要在 Broker 中啟用 STOMP 協定支援,請新增 URI 結構為 stomp 的傳輸連接器定義。

範例

<transportConnectors>
   <transportConnector name="stomp" uri="stomp://127.0.0.1:61613"/>
</transportConnectors>

若要查看完整範例,請嘗試 此 XML。如果您將該 XML 儲存為 foo.xml,則可以透過命令列執行 stomp,如下所示

activemq xbean:foo.xml

如需更多協助,請參閱 執行 Broker

Stomp Wire Format

Stomp 使用基於文字的 Wire Format,可以使用以下選項進行設定。 所有選項都可以在 Broker 的傳輸繫結 URI 上設定。

參數名稱 預設值 描述
maxDataLength 104857600 可以傳送的訊息主體(內容)的最大大小。
maxFrameSize MAX_LONG 從 ActiveMQ Classic 5.12.0 開始:可以傳送的最大框架大小。Stomp 框架包含命令、選用標頭和選用主體。可以協助防止 OOM DOS 攻擊

範例

<transportConnector name="stomp+ssl" uri="stomp+ssl://127.0.0.1:61612?wireFormat.maxFrameSize=1000000"/>

使用正確的前置詞!

Wire Format 選項必須具有前置詞 wireFormat. 才能生效,例如 wireFormat.maxDataLength=100000。缺少此前置詞的選項將會被忽略。

安全性

從 ActiveMQ Classic 5.1 開始:Stomp 完全支援 ActiveMQ Classic 的安全性機制。這表示 CONNECT 命令在驗證失敗時會傳回 ERROR STOMP 框架。此外,當您嘗試存取(讀取/寫入)特定目的地時,將會套用授權原則。如果您使用同步操作(透過使用 收據),則在未經授權的存取嘗試時,您可能會收到 ERROR 框架。在其他情況下,操作將會被捨棄,但不會通知客戶端錯誤。這適用於可能在 Broker 端發生的所有錯誤。

SSL

為了提高安全性,您可以按照以下章節的說明,透過 SSL 使用 Stomp。

透過 NIO 啟用 Stomp

從 ActiveMQ Classic 5.3 開始:為了提高可擴展性和效能,Stomp 協定可以設定為在 NIO 傳輸上執行。相較於對應的 TCP 連接器,NIO 傳輸將會使用更少執行緒。當需要支援大量佇列時,這會有所幫助。若要使用 NIO,請將傳輸連接器的 URI 結構變更為 stomp+nio

範例

<transportConnector name="stomp+nio" uri="stomp+nio://127.0.0.1:61612"/>

透過 SSL 啟用 Stomp

若要設定 ActiveMQ Classic 以透過 SSL 連線使用 Stomp,請將 URI 結構變更為 stomp+ssl

範例

<transportConnector name="stomp+ssl" uri="stomp+ssl://127.0.0.1:61612"/>

如需在 ActiveMQ Classic 中使用 SSL 的更多詳細資訊,請參閱以下文章(如何使用 SSL)。在客戶端使用 Stomp over SSL 的範例可以在 PHP Stomp 客戶端範例中找到。

心跳寬限期

STOMP 協定(1.1 或更新版本)定義了心跳的概念,作為客戶端和 Broker 可以判斷彼此之間基礎 TCP 連線的健全狀況的方法。ActiveMQ Classic 支援 STOMP 心跳,前提是客戶端使用該協定的 1.1 版(或更新版本)。

在 ActiveMQ Classic 5.9.0 之前:嚴格執行「讀取」心跳逾時(也就是從客戶端傳送到 Broker 的心跳)。換句話說,Broker 無法容忍來自客戶端的讀取心跳延遲到達。這會導致 Broker 斷定客戶端已不存在,並在客戶端未能遵守其設定的心跳設定時,關閉其客戶端連線端。

從 ActiveMQ Classic 5.9.0 開始:現在可以透過新的傳輸選項 transport.hbGracePeriodMultiplier 設定讀取心跳的逾時強制執行。

<transportConnectors>
   <transportConnector name="stomp" uri="stomp://127.0.0.1:61613?transport.hbGracePeriodMultiplier=1.5"/>
</transportConnectors>

此乘數是用於計算 Broker 將對每個客戶端連線強制執行的有效讀取心跳逾時。乘數會套用至客戶端在其 CONNECT 框架中指定的讀取逾時間隔

<client specified read heart-beat interval> * <grace periodmultiplier> == <broker enforced read heart-beat timeout interval>

為了向後相容性,如果未設定寬限期乘數,則預設強制執行模式仍為嚴格,例如 transport.hbGracePeriodMultiplier=1.0。嘗試將寬限期乘數設定為小於或等於 1.0 的值將會被自動忽略。

希望能夠容忍來自 Broker 的延遲心跳的 STOMP 客戶端,必須自行實作解決方案。

使用 Stomp 處理目的地

請注意,在將字串作為 JMS 目的地傳遞到 ActiveMQ Classic 之前,會從字串中移除 stomp 中的前置詞 /queue/ 或 /topic/。另請注意,MOM 系統中的預設分隔符號是 .(點)。雖然 FOO.BAR 是識別佇列類型目的地的正常語法,但 Stomp 對應的語法是 /queue/FOO.BAR

請小心以 / 開始目的地

如果在 Stomp 世界中使用 /queue/foo/bar,則在 JMS 世界中,佇列會稱為 foo/bar,而不是 /foo/bar

STOMP 中的持續訊息

STOMP 訊息預設為非持續性。若要使用持續訊息,請將以下 STOMP 標頭新增至所有 SEND 請求:persistent:true。此預設值與 JMS 訊息的預設值相反。

使用 JMS 文字/位元組訊息和 Stomp

Stomp 是一個非常簡單的協定 - 這也是它的優點之一!因此,它不了解 JMS 訊息,例如 TextMessageBytesMessage。但是,該協定確實支援 content-length 標頭。為了在 STOMP 和 JMS 客戶端之間提供更強大的互動,ActiveMQ Classic 會根據是否包含此標頭來判斷從 Stomp 傳送到 JMS 時要建立的訊息類型。邏輯很簡單

包含 content-length 標頭 產生的訊息
BytesMessage
TextMessage

從 JMS 傳送到 Stomp 時,也可以遵循相同的邏輯。可以撰寫 Stomp 客戶端來根據是否包含 content-length 標頭來判斷要為使用者提供的訊息結構類型。

訊息轉換

可以使用 SENDSUBSCRIBE 訊息上的 transformation 訊息標頭,指示 ActiveMQ Classic 將訊息從文字轉換為所需的格式。目前,ActiveMQ Classic 隨附一個轉換器,可以將 XML/JSON 文字轉換為 Java 物件,但您也可以新增自己的轉換器。

以下是如何使用內建轉換器的快速範例(取自測試案例)

private String xmlObject = "<pojo>\n" 
        + "  <name>Dejan</name>\n"
        + "  <city>Belgrade</city>\n" 
        + "</pojo>";

public void testTransformationReceiveXMLObject() throws Exception {
	
    MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
    ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
    producer.send(message);
	
    String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
    stompConnection.sendFrame(frame);

    frame = stompConnection.receiveFrame();
    assertTrue(frame.startsWith("CONNECTED"));

    frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-object-xml\n\n" + Stomp.NULL;
    stompConnection.sendFrame(frame);
    
    frame = stompConnection.receiveFrame();

    assertTrue(frame.trim().endsWith(xmlObject));
    
    frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
    stompConnection.sendFrame(frame);
}     

相依性

ActiveMQ Classic 使用 XStream 來滿足其轉換需求。由於它是選用的相依性,因此您必須將適當的 JAR 放入 lib/ 資料夾,將其新增至 Broker 的類別路徑。此外,如果您計劃使用 JSON 轉換,則必須將 Jettison JSON 解析器新增至類別路徑。

若要建立自己的轉換器,您必須執行以下操作

  1. 透過實作 FrameTranslator 介面來建置您的轉換器

  2. 將其與適當的標頭值關聯,方法是在 JAR 的 META-INF/services/org/apache/activemq/transport/frametranslator/ 資料夾中建立一個名為您想要使用的值的文件,其中將包含值 class=_您的轉換器的完整類別名稱_

例如,內建轉換器包含以下值

class=org.apache.activemq.transport.stomp.XStreamFrameTranslator

META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml 檔案中。

偵錯

如果您想偵錯 Broker 和客戶端之間的 Stomp 通訊,您應該使用 trace 參數來設定 Stomp 連接器,如下所示

<transportConnectors>
   <transportConnector name="stomp" uri="stomp://127.0.0.1:61613?trace=true"/>
</transportConnectors>

這將指示 Broker 追蹤其傳送和接收的所有封包。

此外,您必須為適當的記錄啟用追蹤。您可以透過將以下內容新增至您的 conf/log4j.properties 來實現此目的

log4j.logger.org.apache.activemq.transport.stomp=TRACE

最後,您可能希望將這些訊息保留在個別檔案中,而不是污染標準 Broker 的記錄。您可以使用以下 log4j 設定來實現此目的

log4j.appender.stomp=org.apache.log4j.RollingFileAppender
log4j.appender.stomp.file=${activemq.base}/data/stomp.log
log4j.appender.stomp.maxFileSize=1024KB
log4j.appender.stomp.maxBackupIndex=5
log4j.appender.stomp.append=true
log4j.appender.stomp.layout=org.apache.log4j.PatternLayout
log4j.appender.stomp.layout.ConversionPattern=%d \[%-15.15t\] %-5p %-30.30c{1} - %m%n

log4j.logger.org.apache.activemq.transport.stomp=TRACE, stomp
log4j.additivity.org.apache.activemq.transport.stomp=false

# Enable these two lines and disable the above two if you want the frame IO ONLY (e.g., no heart beat messages, inactivity monitor etc).
#log4j.logger.org.apache.activemq.transport.stomp.StompIO=TRACE, stomp
#log4j.additivity.org.apache.activemq.transport.stomp.StompIO=false

完成此操作後,您的所有 Stomp 封包都會記錄到 data/stomp.log

Java API

從 ActiveMQ Classic 5.2 開始:ActiveMQ Classic 隨附簡單的 Java Stomp API。請注意,提供此 API 純粹是為了測試目的,您應該始終考慮從 Java 使用標準 JMS API,而不是這個 API。以下程式碼片段提供了使用此 API 的簡單範例

StompConnection connection = new StompConnection();
connection.open("localhost", 61613);
		
connection.connect("system", "manager");
StompFrame connect = connection.receive();

if(!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
	throw new Exception ("Not connected");
}
		
connection.begin("tx1");
connection.send("/queue/test", "message1", "tx1", null);
connection.send("/queue/test", "message2", "tx1", null);
connection.commit("tx1");
	
connection.subscribe("/queue/test", Subscribe.AckModeValues.CLIENT);
	
connection.begin("tx2");
	
StompFrame message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
	
message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
	
connection.commit("tx2");
		
connection.disconnect();

此範例是標準 ActiveMQ Classic 發行版的一部分。您可以使用以下命令從 ./example 資料夾執行它

ant stomp

適用於 JMS 訊息語意的 Stomp 擴充功能

請注意,STOMP 的設計宗旨是盡可能簡單,因此任何腳本語言/平台都可以用最少的力氣與其他平台交換訊息。STOMP 允許在每個請求上插入標頭,例如傳送和接收訊息。ActiveMQ Classic 對 Stomp 協定進行了多項擴充,以便 Stomp 客戶端能夠支援 JMS 語義。OpenWire JMS 生產者可以傳送訊息給 Stomp 消費者,而 Stomp 生產者可以傳送訊息給 OpenWire JMS 消費者。而 Stomp 對 Stomp 的配置可以使用更豐富的 JMS 訊息控制功能。

STOMP 在 SENT 訊息上支援以下標準 JMS 屬性

STOMP 標頭 JMS 標頭 描述
correlation-id JMSCorrelationID 良好的消費者會將此標頭添加到他們發送的任何回應中。
expires JMSExpiration 訊息的到期時間。
JMSXGroupID JMSXGroupID 指定 訊息群組
JMSXGroupSeq JMSXGroupSeq 可選標頭,指定 訊息群組 中的序列號。
persistent JMSDeliveryMode 訊息是否持久。
priority JMSPriority 訊息的優先順序。
reply-to JMSReplyTo 您應將回覆傳送到此目的地。
type JMSType 訊息類型。

ActiveMQ Classic 對 STOMP 的擴充

您可以在 STOMP 命令中添加自訂標頭來配置 ActiveMQ Classic 協定。以下是一些範例

動詞 標頭 類型 描述
CONNECT client-id 字串 指定 JMS clientID,它與 activemq.subcriptionName 結合使用以表示一個持久訂閱者。
SUBSCRIBE activemq.dispatchAsync 布林值 對於 Broker 中非持久性的主題,訊息應該從生產者線程同步還是非同步地分發?對於快速消費者,將此值設定為 false。對於慢速消費者,將其設定為 true,這樣分發就不會阻塞快速消費者。
SUBSCRIBE activemq.exclusive 布林值 我希望成為佇列上的 獨佔消費者
SUBSCRIBE activemq.maximumPendingMessageLimit 整數 為了在非持久性主題上進行 慢速消費者處理,透過丟棄舊訊息 - 我們可以設定一個最大待處理限制,一旦慢速消費者累積到這個高水位標記,我們就開始丟棄舊訊息。
SUBSCRIBE activemq.noLocal 布林值 指定是否應忽略本地傳送的訊息進行訂閱。設定為 true 以過濾掉本地傳送的訊息。
SUBSCRIBE activemq.prefetchSize 整數 指定將分發給客戶端的最大待處理訊息數。一旦達到此最大值,除非客戶端確認一條訊息,否則不再分發訊息。當處理訊息可能較慢時,將其設定為較低的值 > 1,以便在多個消費者之間公平分配訊息。注意:如果您的 STOMP 客戶端是使用動態腳本語言(例如 Ruby)實作的,則此參數必須設定為 1,因為沒有客戶端訊息大小的概念可以調整大小。STOMP 不支援值為 0
SUBSCRIBE activemq.priority 位元組 設定消費者的優先順序,以便可以按優先順序加權分發。
SUBSCRIBE activemq.retroactive 布林值 對於非持久性主題,使此訂閱成為 追溯性 訂閱。
SUBSCRIBE activemq.subscriptionName 字串 對於持久性主題訂閱,您必須在 v5.7.0 之前,在連線上指定相同的 activemq.client-id,並在訂閱上指定相同的 activemq.subcriptionName注意:拼寫為 subcriptionName 而不是 subscriptionName。這並不直觀,但這是 ActiveMQ Classic 4.x 中的實作方式。對於 ActiveMQ Classic 的 5.0 版本,subcriptionNamesubscriptionName 都將被支援 (subcriptionName 已在 v5.6.0 中移除)。
SUBSCRIBE selector 字串 使用 JMS 1.1 規格中指定的 SQL 92 語法來指定 JMS 選擇器。這允許在訂閱過程中將篩選器應用於每條訊息。

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛標誌和 Apache ActiveMQ 專案標誌是 The Apache Software Foundation 的商標。 版權所有 © 2024,The Apache Software Foundation。根據 Apache 授權 2.0 授權。