1. 範例
我們在 Artemis 範例中提供了一些範例。
-
.NET
-
./examples/protocols/amqp/dotnet
-
-
ProtonCPP
-
./examples/protocols/amqp/proton-cpp
-
./examples/protocols/amqp/proton-clustered-cpp
-
-
Ruby
-
./examples/protocols/amqp/proton-ruby
-
-
Java(使用 qpid JMS 客戶端)
-
./examples/protocols/amqp/queue
-
-
攔截器
-
./examples/features/standard/interceptor-amqp
-
./examples/features/standard/broker-plugin
-
2. 訊息轉換
在傳送和接收 AMQP 時,訊息代理程式不會執行任何訊息轉換為其他協定。
但是,如果您的訊息要由 AMQP JMS 客戶端接收,您必須遵循 JMS 對應慣例。如果您傳送的訊息主體類型不在此規範中識別,則 AMQP 和任何其他協定之間的轉換會使其成為二進位訊息。如果您打算跨協定或語言,請務必遵循這些慣例,尤其是在訊息主體上。
相容性設定允許將 AMQP 佇列(JMS 持久和共用訂閱)的命名慣例與 CORE 對齊。由於向後相容性的原因,您需要透過訊息代理程式設定明確啟用此功能。
- amqp-use-core-subscription-naming
-
-
true
- 使用與 CORE 對齊的佇列命名慣例。 -
false
(預設) - 使用較舊的命名慣例。
-
3. 攔截和變更訊息
由於以下幾個原因,我們不建議在伺服器端變更訊息:
-
AMQP 訊息應為不可變的
-
該訊息將不是使用者傳送的原始訊息
-
AMQP 可以對訊息進行簽署。簽名將被破壞。
-
為了效能考量。我們盡量不重新編碼(甚至解碼)訊息。
如果無論這些建議如何,您仍然需要並想要攔截和變更 AMQP 訊息,請查看上述攔截器範例。
4. AMQP 和安全性
Apache ActiveMQ Artemis 伺服器接受 PLAIN、ANONYMOUS 和 GSSAPI SASL 機制。這些機制在訊息代理程式的 安全性基礎架構上實作。
5. AMQP 和目的地
如果 AMQP 連結是動態的,則會建立暫時佇列,並且遠端來源或遠端目標位址將設定為暫時佇列的名稱。如果連結不是動態的,則遠端目標或來源的位址將用於佇列。如果佇列不存在,只要設定允許,就會自動建立。
6. AMQP 和多播位址(主題)
雖然 AMQP 沒有「主題」的概念,但仍然可以將 AMQP 消費者或接收者視為訂閱,而不僅僅是佇列上的消費者。依預設,任何附加到僅啟用 multicast
的位址的接收連結都將被視為訂閱,並建立相應的訂閱佇列。如果 Terminus Durability 為 UNSETTLED_STATE
或 CONFIGURATION
,則佇列將設為持久性(類似於 JMS 持久訂閱),並給予一個由容器 ID 和連結名稱組成的名稱,例如 my-container-id:my-link-name
。如果 Terminus Durability 設定為 NONE
,則會建立揮發性 multicast
佇列。
7. AMQP 和協調 - 處理交易
AMQP 連結的目標也可以是協調器。協調器用於處理交易。如果使用協調器,則基礎伺服器工作階段將會被交易,並將透過協調器回滾或提交。
AMQP 允許每個工作階段使用多個交易 ( |
8. AMQP 排程訊息傳遞
AMQP 訊息可以提供排程資訊,以控制訊息最早在未來傳遞的時間。此資訊是透過將訊息註解新增到已傳送的訊息中來提供的。
有兩種不同的訊息註解可用於排程稍後傳遞的訊息
- x-opt-delivery-time
-
指定的值必須是正長整數,對應於訊息應該可供傳遞的時間(以毫秒為單位)。
- x-opt-delivery-delay
-
指定的值必須是正長整數,對應於訊息代理程式接收到給定訊息後,應該在多少毫秒後才能供傳遞。
如果同一個訊息中同時存在這兩個註解,則訊息代理程式將優先使用更明確的 x-opt-delivery-time
值。
9. DLQ 和過期轉移
AMQP 訊息在轉移到 DLQ 或 ExpiryQueue 之前會被複製,並且在此過程中會收到屬性和註解。
訊息代理程式還會保留一個僅限內部使用的屬性(稱為額外屬性),該屬性不會向客戶端公開,並且這些屬性也會在此過程中填入。
以下是 AMQP 訊息在轉移時將收到的註解和屬性名稱列表
註解名稱 | 內部屬性名稱 | 描述 |
---|---|---|
|
|
轉移之前的原始訊息 ID |
|
|
過期發生的時間。自 Epoch 時間起的毫秒數 |
|
|
轉移之前的原始佇列名稱 |
|
|
轉移之前的原始位址名稱 |
10. 依訊息註解篩選
如果您在註解名稱之前使用字首「m.」,則可以依訊息註解進行篩選。
例如,如果您想篩選傳送到特定目的地的訊息,您可以根據以下內容建立篩選器
ConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
javax.jms.Queue queue = session.createQueue("my-DLQ");
MessageConsumer consumer = session.createConsumer(queue, "\"m.x-opt-ORIG-ADDRESS\"='ORIGINAL_PLACE'");
Message message = consumer.receive();
訊息代理程式將設定內部屬性。如果您打算在 DLQ 或過期後進行篩選,您可以選擇內部屬性名稱
// Replace the consumer creation on the previous example:
MessageConsumer consumer = session.createConsumer(queue, "_AMQ_ORIG_ADDRESS='ORIGINAL_PLACE'");
11. 設定 AMQP 空閒逾時
可以透過在 acceptor 上設定屬性 amqpIdleTimeout(以毫秒為單位)來設定 AMQP 伺服器的空閒逾時。
這將使伺服器向客戶端傳送一個 AMQP 框架 open,其中包含您設定的逾時 / 2。
因此,如果您將 AMQP 空閒逾時設定為 60000,則伺服器會告知客戶端每 30,000 毫秒傳送框架。
<acceptor name="amqp">.... ;amqpIdleTimeout=<configured-timeout>; ..... </acceptor>
11.1. 停用保持連線檢查
如果您設定 amqpIdleTimeout=0,則會告知客戶端不要向伺服器傳送保持連線封包。在這種情況下,您將依賴 TCP 來判斷何時需要關閉 Socket。
<acceptor name="amqp">.... ;amqpIdleTimeout=0; ..... </acceptor>
這包含設定 amqpIdleTimeout 的實際範例
<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;directDeliver=false;batchDelay=10</acceptor>
12. Web Sockets
Apache ActiveMQ Artemis 也支援透過 Web Sockets 的 AMQP。支援 Web Sockets 的現代網路瀏覽器可以傳送和接收 AMQP 訊息。
透過正常的 AMQP acceptor 支援透過 Web Sockets 的 AMQP
<acceptor name="amqp-ws-acceptor">tcp://127.0.0.1:5672?protocols=AMQP</acceptor>
使用此設定,Apache ActiveMQ Artemis 將在連接埠 5672
上接受透過 Web Sockets 的 AMQP 連線。然後,網路瀏覽器可以使用 Web Socket 連線到 ws://<伺服器>:5672
來傳送和接收 AMQP 訊息。