Apache ActiveMQ Artemis 支援 AMQP 1.0 規範。預設情況下,有設定 acceptor 元素以在連接埠 616165672 上接受 AMQP 連線。

請參閱「協定和互通性」章節,以取得設定 AMQP acceptor 的詳細資訊。

您可以使用任何相容於 AMQP 1.0 的客戶端。

簡短列表包含

1. 範例

我們在 Artemis 範例中提供了一些範例。

  • .NET

    • ./examples/protocols/amqp/dotnet

  • ProtonCPP

    • ./examples/protocols/amqp/proton-cpp

    • ./examples/protocols/amqp/proton-clustered-cpp

  • Ruby

    • ./examples/protocols/amqp/proton-ruby

  • Java(使用 qpid JMS 客戶端)

    • ./examples/protocols/amqp/queue

  • 攔截器

    • ./examples/features/standard/interceptor-amqp

    • ./examples/features/standard/broker-plugin

2. 訊息轉換

在傳送和接收 AMQP 時,訊息代理程式不會執行任何訊息轉換為其他協定。

但是,如果您的訊息要由 AMQP JMS 客戶端接收,您必須遵循 JMS 對應慣例。如果您傳送的訊息主體類型不在此規範中識別,則 AMQP 和任何其他協定之間的轉換會使其成為二進位訊息。如果您打算跨協定或語言,請務必遵循這些慣例,尤其是在訊息主體上。

相容性設定允許將 AMQP 佇列(JMS 持久和共用訂閱)的命名慣例與 CORE 對齊。由於向後相容性的原因,您需要透過訊息代理程式設定明確啟用此功能。

amqp-use-core-subscription-naming
  • true - 使用與 CORE 對齊的佇列命名慣例。

  • false (預設) - 使用較舊的命名慣例。

3. 攔截和變更訊息

由於以下幾個原因,我們不建議在伺服器端變更訊息:

  • AMQP 訊息應為不可變的

  • 該訊息將不是使用者傳送的原始訊息

  • AMQP 可以對訊息進行簽署。簽名將被破壞。

  • 為了效能考量。我們盡量不重新編碼(甚至解碼)訊息。

如果無論這些建議如何,您仍然需要並想要攔截和變更 AMQP 訊息,請查看上述攔截器範例。

4. AMQP 和安全性

Apache ActiveMQ Artemis 伺服器接受 PLAIN、ANONYMOUS 和 GSSAPI SASL 機制。這些機制在訊息代理程式的 安全性基礎架構上實作。

5. AMQP 和目的地

如果 AMQP 連結是動態的,則會建立暫時佇列,並且遠端來源或遠端目標位址將設定為暫時佇列的名稱。如果連結不是動態的,則遠端目標或來源的位址將用於佇列。如果佇列不存在,只要設定允許,就會自動建立。

6. AMQP 和多播位址(主題)

雖然 AMQP 沒有「主題」的概念,但仍然可以將 AMQP 消費者或接收者視為訂閱,而不僅僅是佇列上的消費者。依預設,任何附加到僅啟用 multicast 的位址的接收連結都將被視為訂閱,並建立相應的訂閱佇列。如果 Terminus Durability 為 UNSETTLED_STATECONFIGURATION,則佇列將設為持久性(類似於 JMS 持久訂閱),並給予一個由容器 ID 和連結名稱組成的名稱,例如 my-container-id:my-link-name。如果 Terminus Durability 設定為 NONE,則會建立揮發性 multicast 佇列。

7. AMQP 和協調 - 處理交易

AMQP 連結的目標也可以是協調器。協調器用於處理交易。如果使用協調器,則基礎伺服器工作階段將會被交易,並將透過協調器回滾或提交。

AMQP 允許每個工作階段使用多個交易 (amqp:multi-txns-per-ssn),但是在此版本的 Apache ActiveMQ Artemis 中,每個工作階段僅支援單一交易。

8. AMQP 排程訊息傳遞

AMQP 訊息可以提供排程資訊,以控制訊息最早在未來傳遞的時間。此資訊是透過將訊息註解新增到已傳送的訊息中來提供的。

有兩種不同的訊息註解可用於排程稍後傳遞的訊息

x-opt-delivery-time

指定的值必須是正長整數,對應於訊息應該可供傳遞的時間(以毫秒為單位)。

x-opt-delivery-delay

指定的值必須是正長整數,對應於訊息代理程式接收到給定訊息後,應該在多少毫秒後才能供傳遞。

如果同一個訊息中同時存在這兩個註解,則訊息代理程式將優先使用更明確的 x-opt-delivery-time 值。

9. DLQ 和過期轉移

AMQP 訊息在轉移到 DLQ 或 ExpiryQueue 之前會被複製,並且在此過程中會收到屬性和註解。

訊息代理程式還會保留一個僅限內部使用的屬性(稱為額外屬性),該屬性不會向客戶端公開,並且這些屬性也會在此過程中填入。

以下是 AMQP 訊息在轉移時將收到的註解和屬性名稱列表

註解名稱 內部屬性名稱 描述

x-opt-ORIG-MESSAGE-ID

_AMQ_ORIG_MESSAGE_ID

轉移之前的原始訊息 ID

x-opt-ACTUAL-EXPIRY

_AMQ_ACTUAL_EXPIRY

過期發生的時間。自 Epoch 時間起的毫秒數

x-opt-ORIG-QUEUE

_AMQ_ORIG_QUEUE

轉移之前的原始佇列名稱

x-opt-ORIG-ADDRESS

_AMQ_ORIG_ADDRESS

轉移之前的原始位址名稱

10. 依訊息註解篩選

如果您在註解名稱之前使用字首「m.」,則可以依訊息註解進行篩選。

例如,如果您想篩選傳送到特定目的地的訊息,您可以根據以下內容建立篩選器

ConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
javax.jms.Queue queue = session.createQueue("my-DLQ");
MessageConsumer consumer = session.createConsumer(queue, "\"m.x-opt-ORIG-ADDRESS\"='ORIGINAL_PLACE'");
Message message = consumer.receive();

訊息代理程式將設定內部屬性。如果您打算在 DLQ 或過期後進行篩選,您可以選擇內部屬性名稱

// Replace the consumer creation on the previous example:
MessageConsumer consumer = session.createConsumer(queue, "_AMQ_ORIG_ADDRESS='ORIGINAL_PLACE'");

11. 設定 AMQP 空閒逾時

可以透過在 acceptor 上設定屬性 amqpIdleTimeout(以毫秒為單位)來設定 AMQP 伺服器的空閒逾時。

這將使伺服器向客戶端傳送一個 AMQP 框架 open,其中包含您設定的逾時 / 2。

因此,如果您將 AMQP 空閒逾時設定為 60000,則伺服器會告知客戶端每 30,000 毫秒傳送框架。

<acceptor name="amqp">.... ;amqpIdleTimeout=<configured-timeout>; ..... </acceptor>

11.1. 停用保持連線檢查

如果您設定 amqpIdleTimeout=0,則會告知客戶端不要向伺服器傳送保持連線封包。在這種情況下,您將依賴 TCP 來判斷何時需要關閉 Socket。

<acceptor name="amqp">.... ;amqpIdleTimeout=0; ..... </acceptor>

這包含設定 amqpIdleTimeout 的實際範例

<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;directDeliver=false;batchDelay=10</acceptor>

12. Web Sockets

Apache ActiveMQ Artemis 也支援透過 Web Sockets 的 AMQP。支援 Web Sockets 的現代網路瀏覽器可以傳送和接收 AMQP 訊息。

透過正常的 AMQP acceptor 支援透過 Web Sockets 的 AMQP

<acceptor name="amqp-ws-acceptor">tcp://127.0.0.1:5672?protocols=AMQP</acceptor>

使用此設定,Apache ActiveMQ Artemis 將在連接埠 5672 上接受透過 Web Sockets 的 AMQP 連線。然後,網路瀏覽器可以使用 Web Socket 連線到 ws://<伺服器>:5672 來傳送和接收 AMQP 訊息。