可以設定 Apache ActiveMQ Artemis 以特別處理超出設定大小的訊息。伺服器不會將這些訊息的完整內容儲存在記憶體中,而是只在佇列上保留一個指向內容(例如在檔案或資料庫表格中)的精簡物件。
核心協定和 AMQP 協定都支援此功能。
1. 設定伺服器
當使用檔案日誌時,大型訊息會儲存在伺服器上的磁碟中。組態屬性 large-messages-directory
指定大型訊息的儲存位置。
<configuration...>
<core...>
...
<large-messages-directory>data/large-messages</large-messages-directory>
...
</core>
</configuration>
預設情況下,large-messages-directory
為 data/largemessages
。
為了獲得最佳效能,我們建議使用檔案日誌,並將大型訊息目錄放在與訊息日誌或分頁目錄不同的實體磁碟區上。 |
對於 JDBC 持久性,應設定 large-message-table
。
<configuration...>
<core...>
...
<store>
<database-store>
...
<large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
...
</database-store>
</store>
...
</core>
</configuration>
預設情況下,large-message-table
為 LARGE_MESSAGE_TABLE
。
預設情況下,在將最後的位元組寫入大型訊息時,所有寫入都會同步到儲存媒體。這可以透過 large-message-sync
來設定,例如:
<configuration...>
<core...>
...
<large-message-sync>true</large-message-sync>
...
</core>
</configuration>
預設情況下,large-message-sync
為 true
。
2. 設定核心用戶端
任何大於特定大小的訊息都被視為大型訊息。大型訊息將被分割並以片段形式傳送。這由 URL 參數 minLargeMessageSize
決定。
Apache ActiveMQ Artemis 訊息使用每個字元 2 個位元組進行編碼,因此如果訊息資料填滿 ASCII 字元(1 個位元組),則產生的 Apache ActiveMQ Artemis 訊息大小將大約加倍。這在計算「大型」訊息的大小時很重要,因為它在傳送之前可能看起來小於 |
預設值為 100KiB。
直接從用戶端設定傳輸將提供更多關於如何實例化核心工作階段工廠或 JMS 連線工廠的資訊。
3. 核心協定上的壓縮大型訊息
您可以選擇使用 compressLargeMessage
URL 參數以壓縮形式傳送大型訊息。
如果您將布林值 URL 參數 compressLargeMessage
指定為 true,則系統將在訊息傳輸到伺服器端時使用 ZIP 演算法壓縮訊息主體。請注意,伺服器端沒有特殊處理,所有的壓縮和解壓縮都在用戶端完成。
此行為可以透過設定一個可選參數:compressionLevel
來進一步調整。這將決定訊息主體應壓縮多少。compressionLevel
接受 -1
的整數或介於 0-9
之間的值。預設值為 -1
,對應於大約 6-7 級。
如果大型訊息的壓縮大小低於 minLargeMessageSize
,則會以一般訊息的形式傳送到伺服器。這意味著該訊息不會寫入伺服器的大型訊息資料目錄,從而減少了磁碟 I/O。
較高的 compressionLevel 表示訊息主體將被進一步壓縮,但這是以速度和計算負擔為代價的。請務必根據其特定用例調整此值。 |
4. 從核心協定串流大型訊息
Apache ActiveMQ Artemis 支援使用輸入和輸出串流 (java.lang.io
) 設定訊息的主體。
這些串流然後直接用於傳送(輸入串流)和接收(輸出串流)訊息。
接收訊息時,有兩種處理輸出串流的方法;您可以選擇在恢復輸出串流時使用 ClientMessage.saveOutputStream
方法阻塞,或者使用 ClientMessage.setOutputstream
方法非同步地將訊息寫入串流。如果您選擇後者,則必須保持消費者處於活動狀態,直到訊息完全接收完成。
您可以使用任何類型的串流。最常見的用例是傳送儲存在磁碟中的檔案,但您也可以傳送 JDBC Blob、SocketInputStream
、從 HTTPRequests
恢復的內容等。只要它實現 java.io.InputStream
來傳送訊息或 java.io.OutputStream
來接收訊息即可。
4.1. 透過核心 API 串流
下表顯示了 ClientMessage
中可用方法的清單,這些方法也可以透過使用物件屬性透過 JMS 使用。
名稱 | 描述 | JMS 等效項 |
---|---|---|
setBodyInputStream(InputStream) |
設定用於在傳送訊息時讀取訊息主體的 InputStream。 |
JMS_AMQ_InputStream |
setOutputStream(OutputStream) |
設定將接收訊息主體的 OutputStream。此方法不會阻塞。 |
JMS_AMQ_OutputStream |
saveOutputStream(OutputStream) |
將訊息的主體儲存到 |
JMS_AMQ_SaveStream |
在接收核心訊息時設定輸出串流
ClientMessage msg = consumer.receive(...);
// This will block here until the stream was transferred
msg.saveOutputStream(someOutputStream);
ClientMessage msg2 = consumer.receive(...);
// This will not wait the transfer to finish
msg2.setOutputStream(someOtherOutputStream);
在傳送核心訊息時設定輸入串流
ClientMessage msg = session.createMessage();
msg.setInputStream(dataInputStream);
另請注意,對於超過 2GiB 的訊息,getBodySize() 將返回無效值,因為這是一個整數(也公開給 JMS API)。在這些情況下,您可以使用訊息屬性 _AMQ_LARGE_SIZE。
4.2. 透過 JMS 串流
使用 JMS 時,Apache ActiveMQ Artemis 會透過設定物件屬性來對應核心 API 上的串流方法(請參閱上面的 ClientMessage API 表)。您可以使用 Message.setObjectProperty
方法來設定輸入和輸出串流。
InputStream
可以透過正在傳送的訊息上的 JMS 物件屬性 JMS_AMQ_InputStream 來定義
BytesMessage message = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
message.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
someProducer.send(message);
OutputStream
可以透過以阻塞方式接收的訊息上的 JMS 物件屬性 JMS_AMQ_SaveStream 來設定。
BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
File outputFile = new File("huge_message_received.dat");
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
// This will block until the entire content is saved on disk
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
也可以使用屬性 JMS_AMQ_OutputStream 以非阻塞方式設定 OutputStream
。
// This won't wait the stream to finish. You need to keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
使用 JMS 時,只有在 |
4.3. 核心協定上的替代串流
如果您選擇不使用 Apache ActiveMQ Artemis 的 InputStream
或 OutputStream
功能,您仍然可以以替代方式直接存取資料。
在核心 API 上,只需像平常一樣獲取主體的位元組即可。
ClientMessage msg = consumer.receive();
byte[] bytes = new byte[1024];
for (int i = 0 ; i < msg.getBodySize(); i += bytes.length)
{
msg.getBody().readBytes(bytes);
// Whatever you want to do with the bytes
}
如果使用 JMS API,BytesMessage
和 StreamMessage
也會透明地支援它。
BytesMessage rm = (BytesMessage)cons.receive(10000);
byte data[] = new byte[1024];
for (int i = 0; i < rm.getBodyLength(); i += 1024)
{
int numberOfBytes = rm.readBytes(data);
// Do whatever you want with the data
}
5. 設定 AMQP 接收器
您可以在接收器上設定屬性 amqpMinLargeMessageSize
。
預設值為 102400 (100KBytes)。
將其設定為 -1 將禁用大型訊息支援。
將 amqpMinLargeMessageSize 設定為 -1,如果訊息大小不適合日誌,您的 AMQP 訊息可能會儲存為核心大型訊息。這是伺服器先前的語義,為了相容性而保留這種方式。 |
<acceptors>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?; ..... amqpMinLargeMessageSize=102400; .... </acceptor>
</acceptors>