CMS API 概觀
什麼是 CMS?
CMS API 是 Java 中 JMS API 的 C++ 推論,用於從分散在網路中或位於同一台機器上的用戶端發送和接收訊息。在 CMS 中,我們盡可能嘗試保持與 JMS API 的同等性,只有當 JMS 功能強烈依賴 Java 程式語言本身的功能時才會有所不同。儘管存在一些差異,但大多數差異都很小,而且 CMS 大部分都符合 JMS 規範,因此對 JMS 的運作方式有充分的了解應該會讓使用 CMS 變得更加容易。
如果您已經熟悉 JMS,首先可以查看 CMS API 文件
CMS 入門
本節涵蓋使用 CMS API 的基本知識。為了方便討論,我們假設您在此使用 ActiveMQ-CPP 連接到 ActiveMQ Broker,當然,使用 CMS,您也可以將 C++ 應用程式中的 CMS API 的其他實作連結,並連接到其他訊息服務。
CMS ConnectionFactory
您通常在 CMS API 中使用的第一個介面是 ConnectionFactory。ConnectionFactory 允許您建立 CMS Connection 物件,該物件維護與某些訊息服務(例如 ActiveMQ Broker)的連線。
獲取 CMS ConnectionFactory 實例的最簡單方法是使用所有 CMS 提供者程式庫都必須實作的靜態方法 createCMSConnectionFactory。下面的程式碼片段示範如何取得新的 ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
如您所見,createCMSConnectionFactory 採用單個字串參數,其格式為 URI,用於定義要建立的連線所要連接的位置以及應使用的協定,在上述範例中為 TCP/IP。此外,配置資訊可以編碼在 URI 中。有關可透過 URI 傳遞給 ActiveMQ-CPP 的配置參數的更多資訊,請參閱配置頁面。
一旦您建立了 ConnectionFactory,接下來要做的是使用 ConnectionFactory 建立 CMS Connection。Connection 是管理用戶端與提供者的連線的物件。下一節將介紹 CMS Connection 的使用,建立 Connection 的程式碼如下所示
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
建立時,Connection 物件會嘗試連線到 CMS 提供者,如果連線失敗,則會擲回 CMSException,其中錯誤描述儲存在其 Message 屬性中。
連線與身份驗證
createConnection 方法有幾個版本,允許您為新建立的連線指定登入參數。您最常用的版本會採用使用者名稱和密碼對,該密碼對會傳輸到 Broker 進行驗證。如果憑證無效,則會擲回 CMSException。下面的範例程式碼顯示如何在建立 Connection 物件時傳遞使用者名稱和密碼。
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection( "<USERNAME>", "<PASSWORD>") );
如果您不想將值硬式編碼到原始程式碼中,或編寫程式碼從其他位置讀取登入資料,則還有另一種選擇可以傳遞使用者名稱和密碼,您可以對傳遞給 createConnectionFactory 的 URI 進行編碼,以便 connectionFactory 在剖析 URI 時從系統環境讀取值。下面的範例示範如何使用 URI 中設定的登入資料建立連線工廠。
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616?username=${USERNAME}&password=${PASSWORD}" ) );
如您所見,讓 URI 上的值來自系統環境非常簡單。此方法適用於您可以在 URI 中指定的任何參數。
CMS 連線
CMS Connection 介面定義一個物件,該物件是用戶端與 CMS 提供者的活動連線。在大多數情況下,用戶端只會建立一個連線物件,因為它被視為重量級物件。
連線有多個用途
- 它封裝了與 JMS 提供者的開啟連線。它通常表示用戶端和提供者服務守護程式之間的開啟 TCP/IP Socket。
- 其建立是執行用戶端身份驗證的地方。
- 它可以指定唯一的用戶端識別碼。
- 它提供 ConnectionMetaData 物件。
- 它支援可選的 ExceptionListener 物件。
正如我們先前所見,CMS 連線是從 CMS ConnectionFactory 建立的。如果 ConnectionFactory 建立呼叫成功,則傳回的 Connection 物件會連接到 CMS 提供者。Connection 物件是以停止狀態建立的,在用戶端建立訊息取用者之前,不會將任何訊息傳遞到訊息取用者。通常會將連線保持在停止狀態,直到用戶端建立其打算使用的初始 Session、訊息生產者和訊息取用者集。一旦用戶端的設定階段完成,它應該呼叫連線的 start 方法,以開始從提供者接收訊息。如果未能呼叫 start 方法是 CMS 和 JMS 用戶端新用戶之間非常常見的錯誤,如果您發現沒有收到任何訊息,首先要檢查的是您是否呼叫了 start。
建立連線後,用戶端必須建立 CMS Session,才能建立訊息生產者和取用者。下面的程式碼片段將我們目前所見的內容放在一起,然後顯示如何從我們的 Connection 實例建立 CMS Session 物件,接下來的章節將更詳細地討論 CMS Session。
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
std::auto_ptr<cms::Session> session( connection->createSession() );
CMS Session
一旦您成功建立 CMS Connection,接下來您通常會使用新的 Connection 實例建立一個或多個 CMS Session 物件。Session 被定義為產生和取用訊息的單執行緒環境。
Session 有多個用途
- 它是其訊息生產者和取用者的工廠。
- 它提供提供者最佳化的訊息工廠。
- 它是 TemporaryTopics 和 TemporaryQueues 的工廠。
- 它為那些需要動態操作提供者特定目的地名稱的用戶端提供了一種建立 Queue 或 Topic 物件的方法。
- 它支援一系列單一交易,將其生產者和取用者的工作合併為原子單位。
- 它定義其所取用訊息和所產生訊息的連續順序。
- 它會保留其所取用的訊息,直到這些訊息被確認。
- 它會序列化在其訊息取用者註冊的訊息接聽程式的執行。
- 它是 QueueBrowsers 的工廠(尚未實作)。
一個 Session 可以建立和服務多個訊息生產者和取用者。
當用戶端建立 CMS Session 時,它必須指定 Session 將確認其接收和分派訊息的模式。下表總結了支援的模式。
確認模式 | 描述 |
---|---|
AUTO_ACKNOWLEDGE | 在此確認模式下,當 Session 從接收呼叫成功傳回時,或當 Session 呼叫以處理訊息的訊息接聽程式成功傳回時,Session 會自動確認用戶端收到訊息。 |
CLIENT_ACKNOWLEDGE | 在此確認模式下,用戶端透過呼叫訊息的 acknowledge 方法來確認已取用的訊息。確認已取用的訊息會確認 Session 已取用的所有訊息。當使用用戶端確認模式時,用戶端可能會在嘗試處理訊息時建立大量未確認的訊息。CMS 提供者應為管理員提供一種限制用戶端溢出的方法,以便在用戶端使用的某些資源被暫時封鎖時,不會導致用戶端資源耗盡和隨之而來的失敗。 |
DUPS_OK_ACKNOWLEDGE | 此確認模式會指示 Session 延遲確認訊息的傳遞。如果 JMS 提供者失敗,這可能會導致傳遞一些重複的訊息,因此只有可以容忍重複訊息的取用者才應使用它。使用此模式可以透過減少 Session 為防止重複而執行的工作來降低 Session 負擔。 |
SESSION_TRANSACTED | Session 已交易,並且訊息的確認在內部處理。 |
INDIVIDUAL_ACKNOWLEDGE | 確認僅適用於單一訊息。與 CLIENT_ACKNOWLEDGE(其中確認適用於該 Session 到目前為止接收的所有訊息)不同,此模式僅適用於單一訊息,允許用戶端更具選擇性地確認哪些訊息。 |
在前一節中,我們向您展示了如何建立 Session,讓我們再次看看這裡的範例
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
std::auto_ptr<cms::Session> session( connection->createSession() );
在此程式碼片段中,Session 是使用不帶參數的 createSession 建立的,這會建立一個處於 AUTO_ACKNOWLEDGE 模式的 Session。若要使用上面列出的其中一種模式建立 Session,CMS Session 介面中還有第二個 create 方法,該方法採用一個指定模式的單一參數,讓我們來看一個範例
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
std::auto_ptr<cms::Session> session( connection->createSession( cms::Session::CLIENT_ACKNOWLEDGE ) );
如您所見,這裡沒有太大差異,只需指定您想要的確認模式,您就可以建立 Session 資源,在接下來的幾個章節中,我們將逐步介紹您可以從 Session 建立的物件類型,並向您展示使用它們的基本知識。
從 CMS Session 建立的物件
在本節中,我們將介紹從 CMS Session 物件的實例建立的物件類型。
CMS 訊息
如同其名稱所示,CMS 的存在是為了發送和接收訊息,因此從介紹您可以使用 CMS 發送和接收的訊息開始是很合理的。在撰寫本文時,CMS 支援四種主要的訊息類型,未來可能會增加其他類型,但我們將堅持使用目前完全支援的類型。下表顯示訊息類型及其簡短描述,有關特定訊息類型的介面和用法的完整詳細資訊,請參閱 CMS API 文件。
訊息類型 | 描述 |
---|---|
訊息 | 訊息介面定義了最簡單的 CMS 訊息。與其他 CMS 訊息不同,訊息類型沒有主體或有效負載,但它可以使用一組屬性設定器來設定屬性,這些設定器包含 C++ 中的基本類型。訊息介面是 CMS 中所有訊息類型的根。 |
文字訊息 | 文字訊息類別攜帶一個由 C++ 字串組成的有效負載。文字訊息介面擴展了訊息介面,增加了設定和取得有效負載文字的方法,並保留了設定訊息屬性的支援。由於 Java 物件無法直接從 JMS 用戶端發送到 CMS 用戶端,因此文字訊息是將物件序列化為 XML 並將其發送到 JMS 用戶端的理想方式。 |
位元組訊息 | 位元組訊息的有效負載由一系列不間斷的位元組組成,接收者負責解釋這些位元組。位元組訊息將取得和設定位元組陣列的方法新增到標準的訊息介面方法集中。 |
映射訊息 | 映射訊息的有效負載是一組名稱/值對。名稱是 C++ 字串類型,值是 C++ 基本類型或字串。 |
串流訊息 | 串流訊息的主體由一個自描述的基本類型清單組成。串流訊息介面提供存取器方法,可以將基本類型讀取和寫入到/從訊息。當轉換不會導致資料遺失時,讀取方法允許基本類型轉換。 |
現在我們已經了解了可以建立的訊息類型,讓我們看看如何實際建立它們,並探索訊息類別中可用的一些操作。
建立訊息
您可能已經猜到,訊息是使用我們之前建立的 CMS 會期實例建立的。會期提供了建立我們上面介紹的四種訊息類型的方法。會期是建立 CMS 中定義的訊息介面之提供者實作的工廠,它知道如何配置內部資料結構,並防止用戶端直接綁定到提供者實作,這就是為什麼我們必須使用會期來建立訊息物件而不是直接建立它們的原因。讓我們看看一段程式碼片段,它建立一個文字訊息實例並在該訊息上設定一些屬性。
// Create the ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
// Create a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
// Create a new Session from our Connection
std::auto_ptr<cms::Session> session( connection->createSession() );
// Now create a TextMessage
std::auto_ptr<cms::TextMessage> textMessage( session->createTextMessage() );
// Set the payload
textMessage->setText( "Payload Text" );
// Set some Properties
textMessage->setStringProperty( "USER_NAME", "Steve" );
textMessage->setIntProperty( "USER_CODE", 42 );
從上面的程式碼可以看出,建立文字訊息與建立會期或連線實例非常相似,您只需在 CMS 會期實例中呼叫 createTextMessage,就會得到一個新的文字訊息指標,然後您可以填入文字和屬性。
CMS 目的地
如同其名稱所示,CMS 目的地介面定義了一個物件,該物件代表訊息路由到訊息代理的端點。用戶端建立目的地並將訊息傳送到它們,或等待在他們已訂閱的目的地上接收訊息。CMS 中有兩種基本類型的目的地,主題和佇列,它們有兩種子類型,臨時主題和臨時佇列。下表總結了四種不同的目的地類型。
目的地類型 | 描述 |
主題 | 在 CMS 中,主題實作發布和訂閱語意。當您發布訊息時,它會傳送到所有感興趣的訂閱者 - 因此零到多個訂閱者將收到訊息的副本。只有在代理接收到訊息時具有有效訂閱的訂閱者才會收到訊息的副本。 |
佇列 | CMS 佇列實作負載平衡器語意。單一訊息將僅由一個消費者接收。如果發送訊息時沒有可用的消費者,則會保留該訊息,直到有可用的消費者可以處理該訊息為止。如果消費者接收到訊息並且在關閉之前未確認該訊息,則該訊息將重新傳遞給另一個消費者。一個佇列可以有多個消費者,訊息會在可用的消費者之間進行負載平衡。 |
臨時主題 | 臨時主題物件是一個唯一的 Topic 物件,為連線的持續時間而建立。它是系統定義的主題,只能由建立它的連線使用。 |
臨時佇列 | 臨時佇列物件是一個唯一的佇列物件,為連線的持續時間而建立。它是系統定義的佇列,只能由建立它的連線使用。 |
現在我們已經了解了目的地類型,讓我們看看一個程式碼片段,它顯示如何建立目的地物件。下面的範例顯示如何使用現在應該很熟悉的模式來建立主題實例。
// Create the ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
// Create a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
// Create a new Session from our Connection
std::auto_ptr<cms::Session> session( connection->createSession() );
// Now create a Topic
std::auto_ptr<cms::Topic> myTopic( session->createTopic( "EXAMPLE-TOPIC" ) );
建立主題或佇列需要傳遞目的地的名稱,該名稱類似於位址,發送到“EXAMPLE-TOPIC”目的地的訊息由已訂閱到相同目的地的用戶端接收。
CMS 訊息消費者
現在我們已經介紹了如何建立訊息和目的地,我們將看看如何建立 CMS 訊息消費者。訊息消費者允許用戶端應用程式接收其他用戶端發送到主題或佇列的訊息。在我們討論如何使用消費者接收訊息之前,讓我們先看看如何建立一個消費者。
// Create the ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
// Create a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
// Create a new Session from our Connection
std::auto_ptr<cms::Session> session( connection->createSession() );
// Now create a Topic
std::auto_ptr<cms::Topic> myTopic( session->createTopic( "EXAMPLE-TOPIC" ) );
// Now create the Consumer
std::auto_ptr<cms::MessageConsumer> myConsumer( session->createConsumer( myTopic ) );
如您所見,訊息消費者是通過從 CMS 會期物件的實例呼叫 createConsumer 來建立的。訊息消費者在您建立它時會被賦予一個用於監聽訊息的目的地。一旦您建立了一個訊息消費者,就可以開始使用它來接收訊息,訊息消費者有兩種接收訊息的方法,一種是同步的,另一種是非同步的。
接收訊息的同步方法涉及呼叫消費者的接收方法。如果在沒有給定逾時的情況下,呼叫接收將會阻塞,直到在相關目的地收到訊息;如果給定的逾時時間經過且沒有新訊息到達,則會傳回 NULL。讓我們看一個使用 CMS 的簡單同步訊息輪詢迴圈範例。
while( !done ) {
std::auto_ptr<Message> message( myConsumer->receive() );
...Do Something with the message...
}
如上面的程式碼所示,我們呼叫了訊息消費者的 receive 方法,預期它會在某個時間點傳回一個新的訊息物件。此處的程式碼會阻塞,直到收到新訊息為止。這種方法假設您的程式在收到訊息之前沒有其他需要執行的操作,但是如果您的應用程式需要執行其他處理,則可以選擇其他方法。下面的程式碼範例顯示了一個輪詢迴圈,它使用訊息消費者的 receiveNoWait 方法進行輪詢並立即傳回,以便進行其他處理。
```while( !done ) {
std::auto_ptr
if( message.get() != NULL ) { …使用訊息執行某些操作… }
…在檢查是否有其他訊息之前執行其他應用程式邏輯… }
The asynchronous method involves implementing the CMS MessageListener interface and passing an instance of your implementation to the MessageConsumer's **setMessageListener** method. When a new message arrives your listener's **onMessage** method will be called by the consumer in the context of another thread to allow you to process the Message received. Below is a code snippet that demonstrates implementing the MessageListener interface.
class SimpleListener : public cms::MessageListener {
virtual void onMessage( const Message* message ) {
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
printf( "Message Received: %s\n", text.c_str() );
} }; ```
在上面的範例中,我們建立了一個名為 SimpleListener 的新類別,當收到文字訊息時,它會列印文字訊息的內容,或列印一條訊息,指示它沒有收到預期的文字訊息。請注意,onMessage 方法接收一個指向基本訊息介面的指標,然後我們嘗試動態轉換為我們認為應該接收的類型。這允許您的程式碼在一個方法中處理多種類型的訊息。傳遞的指標由呼叫者或 onMessage 擁有,因此您不應儲存或刪除它,如果您需要保留訊息的副本,則必須通過呼叫訊息的 clone 方法來建立副本。
現在我們有了要使用的訊息監聽器實作,是時候看看如何使用我們先前建立的訊息消費者設定非同步消耗了。
SimpleListener listener;
myConsumer->setMessageListener( &listener );
就是這樣,我們現在將在 SimpleListener 實例的 onMessage 方法中接收發送到我們建立的目的地的訊息。
CMS 訊息生產者
我們已經了解了如何消費訊息,現在我們如何首先生產訊息?答案是 CMS 訊息生產者,它用於將訊息傳送到代理,以便分發給在主題或佇列上監聽訊息的各種用戶端。建立訊息生產者與建立訊息消費者非常相似,您首先建立連線、會期和目的地物件,然後使用會期建立訊息生產者。下面的程式碼片段示範了如何建立訊息生產者。
// Create the ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
// Create a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
// Create a new Session from our Connection
std::auto_ptr<cms::Session> session( connection->createSession() );
// Now create a Topic
std::auto_ptr<cms::Topic> myTopic( session->createTopic( "EXAMPLE-TOPIC" ) );
// Now create the Consumer
std::auto_ptr<cms::MessageProducer> myProducer( session->createProducer( myTopic ) );
完整範例
現在我們已經介紹了 CMS API 的大部分基礎知識,是時候看看一些完整的範例,這些範例示範了您如何在自己的應用程式中使用 CMS API。第一個範例將示範如何建立一個簡單的非同步消費者,它可以從 ActiveMQ 代理接收文字訊息物件,然後在第二個範例中,我們將看看一個簡單的生產者,它會將文字訊息物件發布到我們的消費者正在監聽的目的地。
簡單的非同步消費者
在簡單的非同步消費者範例中,我們將 CMS API 用法包裝在一個名為 SimpleAsyncConsumer 的類別中。此類別公開一個單一的建構函式,允許使用者建立一個連接到特定代理和目的地的類別實例,以及目的地是佇列還是主題。使用者還可以指定確認模式應為 CLIENT_ACKNOWLEDGE 而不是預設的 AUTO_ACKNOWLEDGE 模式。
一旦建立此類別的實例,使用者就會呼叫 runConsumer 方法以開始監聽指定的目的地。runConsumer 方法會建立與代理的連線,並以配置的確認模式啟動新的會期。建立會期後,可以建立新的消費者並將其附加到配置的目的地。由於我們想要以非同步方式監聽新訊息,因此 SimpleAsyncConsumer 會繼承自 cms::MessageListener,以便它可以將自己註冊為 runConsumer 中建立的訊息消費者的訊息監聽器。
在 runConsumer 方法回傳後,主方法會等待使用者輸入以結束程式。在應用程式執行期間收到的所有訊息,都會被分派到 SimpleAsyncConsumer 的 onMessage 方法。如果訊息是 TextMessage,其內容將會列印在螢幕上。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class SimpleAsyncConsumer : public ExceptionListener,
public MessageListener,
public DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
bool useTopic;
std::string brokerURI;
std::string destURI;
bool clientAck;
private:
SimpleAsyncConsumer( const SimpleAsyncConsumer& );
SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
public:
SimpleAsyncConsumer( const std::string& brokerURI,
const std::string& destURI,
bool useTopic = false,
bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
useTopic(useTopic),
brokerURI(brokerURI),
destURI(destURI),
clientAck(clientAck) {
}
virtual ~SimpleAsyncConsumer() {
this->cleanup();
}
void close() {
this->cleanup();
}
void runConsumer() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory( brokerURI );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
if( amqConnection != NULL ) {
amqConnection->addTransportListener( this );
}
connection->start();
connection->setExceptionListener(this);
// Create a Session
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered MessageListener.
virtual void onMessage( const Message* message ) {
static int count = 0;
try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
if( clientAck ) {
message->acknowledge();
}
printf( "Message #%d Received: %s\n", count, text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
printf("CMS Exception occurred. Shutting down client.\n");
exit(1);
}
virtual void transportInterrupted() {
std::cout << "The Connection's Transport has been Interrupted." << std::endl;
}
virtual void transportResumed() {
std::cout << "The Connection's Transport has been Restored." << std::endl;
}
private:
void cleanup(){
try {
if( connection != NULL ) {
connection->close();
}
} catch ( CMSException& e ) {
e.printStackTrace();
}
delete destination;
delete consumer;
delete session;
delete connection;
}
};
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";
// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// https://activemq.dev.org.tw/cms/
//
std::string brokerURI =
"failover:(tcp://127.0.0.1:61616)";
//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the consumer listens, to have the consumer
// use a topic or queue set the 'useTopics' flag.
//============================================================
std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in the consumer.
//============================================================
bool useTopics = false;
//============================================================
// set to true if you want the consumer to use client ack mode
// instead of the default auto ack mode.
//============================================================
bool clientAck = false;
// Create the consumer
SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
// Start it up and it will listen forever.
consumer.runConsumer();
// Wait to exit.
std::cout << "Press 'q' to quit" << std::endl;
while( std::cin.get() != 'q') {}
// All CMS resources should be closed before the library is shutdown.
consumer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}
簡易生產者
如同簡易非同步消費者範例,簡易生產者範例將建立生產者所需的 CMS API 詳細資訊封裝到一個名為 SimpleProducer 的類別中。這個類別提供與消費者範例類似的介面。它有一個建構子,允許使用代理程式和目標的組態選項,以及要傳送到已設定目標的訊息數量來建立實例。一旦建立完成,客戶端程式碼只需呼叫 SimpleProducer 的 run 方法來發布指定數量的訊息。一旦 run 方法完成,客戶端就可以自由關閉 SimpleProducer,這會清理已配置的 CMS 資源。關閉後,應用程式就會結束。
一個簡單的訊息生產者範例
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
unsigned int numMessages;
std::string brokerURI;
std::string destURI;
private:
SimpleProducer( const SimpleProducer& );
SimpleProducer& operator= ( const SimpleProducer& );
public:
SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
const std::string& destURI, bool useTopic = false, bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
useTopic(useTopic),
clientAck(clientAck),
numMessages(numMessages),
brokerURI(brokerURI),
destURI(destURI) {
}
virtual ~SimpleProducer(){
cleanup();
}
void close() {
this->cleanup();
}
virtual void run() {
try {
// Create a ConnectionFactory
auto_ptr<ActiveMQConnectionFactory> connectionFactory(
new ActiveMQConnectionFactory( brokerURI ) );
// Create a Connection
try{
connection = connectionFactory->createConnection();
connection->start();
} catch( CMSException& e ) {
e.printStackTrace();
throw e;
}
// Create a Session
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
// Create the Thread Id String
string threadIdStr = Long::toString( Thread::currentThread()->getId() );
// Create a messages
string text = (string)"Hello world! from thread " + threadIdStr;
for( unsigned int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage( text );
message->setIntProperty( "Integer", ix );
// Tell the producer to send the message
printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
try {
if( connection != NULL ) {
connection->close();
}
} catch ( CMSException& e ) {
e.printStackTrace();
}
delete destination;
delete producer;
delete session;
delete connection;
}
};
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";
// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// https://activemq.dev.org.tw/cms/
//
std::string brokerURI =
"failover://(tcp://127.0.0.1:61616)";
//============================================================
// Total number of messages for this producer to send.
//============================================================
unsigned int numMessages = 2000;
//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the Producer produces, to have the producer
// use a topic or queue set the 'useTopics' flag.
//============================================================
std::string destURI = "TEST.FOO";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in the producer.
//============================================================
bool useTopics = false;
// Create the producer and run it.
SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
// Publish the given number of Messages
producer.run();
// Before exiting we ensure that all CMS resources are closed.
producer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}