Apache ActiveMQ ™ -- 處理諮詢訊息
ActiveMQ 支援諮詢訊息,讓您可以使用一般的 CMS 訊息來監看系統。目前您可以使用諮詢訊息來做幾件事:
- 查看消費者、生產者和連線的啟動和停止
- 查看暫時目的地的建立和銷毀
- 取得主題和佇列上過期訊息的通知
- 觀察代理程式將訊息傳送到沒有消費者的目的地。
- 查看連線的啟動和停止
諮詢訊息可以被視為某種管理通道,您可以在其中收到有關 JMS 提供者上發生的事情,以及生產者、消費者和目的地發生的事情的資訊。若要深入瞭解代理程式的諮詢訊息支援,請參閱這篇文章。
開始之前
本教學假設讀者對 CMS API 有基本的了解,並且知道如何使用 ActiveMQ-CPP 程式庫建構基本應用程式。如果您不確定如何使用 CMS API,那麼您應該先閱讀CMS API 概觀。本教學是針對 ActiveMQ-CPP 3.0 發行版本的 API 所撰寫,雖然可以使用舊的 ActiveMQ-CPP 2.x 用戶端來處理諮詢訊息,但可能會有部分程式碼差異,本文中不會顯示。
訂閱諮詢主題
要接收諮詢訊息,您首先需要訂閱提供所需諮詢訊息的主題。您可以像訂閱任何其他目的地一樣訂閱這些主題,訣竅是使用您想要使用之主題的正確名稱。讓我們先來看幾個可用的類型(這不是完整集合)。
諮詢主題 | 描述 | 屬性 | 資料結構 |
---|---|---|---|
ActiveMQ.Advisory.Connection | 連線啟動和停止訊息 | ||
ActiveMQ.Advisory.Producer.Queue | 佇列上生產者啟動和停止訊息 | 字串=’producerCount’ - 生產者數量 | ProducerInfo |
ActiveMQ.Advisory.Producer.Topic | 主題上生產者啟動和停止訊息 | 字串=’producerCount’ - 生產者數量 | ProducerInfo |
ActiveMQ.Advisory.Consumer.Queue | 佇列上消費者啟動和停止訊息 | 字串=’consumerCount’ - 消費者數量 | ConsumerInfo |
ActiveMQ.Advisory.Consumer.Topic | 主題上消費者啟動和停止訊息 | 字串=’consumerCount’ - 消費者數量 | ConsumerInfo |
現在,看看上面的清單,讓我們選擇一個主題,並找出如何建立 CMS 主題來訂閱,以便接收諮詢訊息。使用 Java 用戶端,我們可以使用公用程式類別 AdvisorySupport 來建立目的地,但目前 ActiveMQ-CPP 沒有提供此類公用程式類別,希望我們能在未來的版本中看到新增(當然,隨時歡迎貢獻!)。
如果我們有一個名為 TOPIC.FOO 的主題,而且我們想知道何時有生產者訂閱該主題,我們需要建立一個主題物件,其名稱為 ActiveMQ.Advisory.Producer.Topic.TOPIC.FOO,才能接收我們感興趣的諮詢訊息。我們知道這一點,因為我們可以查看上面的表格,並看到只要生產者開始或停止在主題上發布訊息,就會通知 ActiveMQ.Advisory.Producer.Topic,而且我們也知道我們的主題名為 TOPIC.FOO,因此將它們加在一起會得到我們諮詢主題的名稱,我們也知道這一點,因為我們看過 AdvisorySupport.java 類別,而且,這不算作弊。以下是使用 CMS 會話建立主題的程式碼片段:
std::auto_ptr<cms::Topic> advisories( session->createTopic(
"ActiveMQ.Advisory.Producer.Topic.TOPIC.FOO" ) );
一旦我們為想要監聽的諮詢訊息建立主題,我們只需要建立一個消費者來監聽它們,下面的程式碼片段示範了這一點
std::auto_ptr<cms::MessageConsumer> consumer;
consumer.reset( session->createConsumer( advisories.get() ) );
consumer->setMessageListener( this );
如您所見,訂閱諮詢主題和訂閱 CMS 中任何其他目的地之間沒有區別。在上面的範例中,我們註冊為非同步監聽器,您也可以使用一般的阻塞式 receive 方法,但我們更喜歡這種方法。
處理傳入的諮詢訊息
每個諮詢的訊息類型都是 'Advisory',並且有一些預定義的訊息屬性,要檢查 CMS 訊息是否為這種類型,您會在訊息物件上呼叫 getCMSType 方法。在某些情況下,您會知道您唯一會收到的訊息是諮詢訊息,因為您的用戶端只訂閱諮詢主題,其他時候您可能會將多個 MessageConsumer 連線到同一個 MessageListener,在這種情況下,您必須檢查訊息類型。一旦您知道您正在處理諮詢訊息,那麼您就可以開始檢查它,以確定它是什麼類型的訊息,並提取對您的應用程式有意義的資料。
許多諮詢訊息將有意義的資料儲存在訊息屬性中,例如,消費者啟動/停止諮詢訊息包含一個以 consumerCount 為鍵值的元素,該元素會填入主題或佇列上目前作用中的消費者數量。讓我們看看一個程式碼片段,該程式碼片段會檢查 onMessage 回呼中接收到的訊息,以查看是否為諮詢訊息,如果是,則會對其採取行動
void AdvisoryProducer::onMessage( const cms::Message* message ) {
if( message->getCMSType() == "Advisory" ) {
std::cout << "Received an Advisory Message!" << std::endl;
if( message->propertyExists( "consumerCount" ) ) {
std::string consumerCount = message->getStringProperty( "consumerCount" );
std::cout << "Number of Consumers = " << consumerCount << std::endl;
// Do Something Meaningful here....
}
} else {
std::cout << "Received a Non-Advisory Message!" << std::endl;
}
}
完整範例:只有在有消費者時才會產生訊息的生產者
現在您已經了解諮詢訊息處理的基本知識,是時候向您展示一個完整的範例,示範如何使用諮詢訊息。下列程式碼顯示一個類別標頭和來源檔案,該類別標頭和來源檔案實作了一個基本的 CMS 生產者,該生產者只有在有作用中的消費者時,才會將心跳訊息傳送到名為 HEART-BEAT-CHANNEL 的主題,否則會保持閒置。
#ifndef _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_
#define _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_
#include <string>
#include <memory>
#include <cms/Session.h>
#include <cms/MessageProducer.h>
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
namespace activemqcpp {
namespace examples {
namespace advisories {
/**
* A sample Producer that will only send Messages on its Topic when it has
* received an advisory indicating that there is an active MessageConsumer
* on the Topic. Once another message comes in indicating that there is no
* longer a consumer then this Producer stops producing again.
*
* @since 3.0
*/
class AdvisoryProducer : public decaf::lang::Runnable,
public cms::MessageListener {
private:
volatile bool consumerOnline;
volatile bool shutdown;
decaf::util::concurrent::CountDownLatch shutdownLatch;
cms::Session* session;
std::auto_ptr<cms::MessageConsumer> consumer;
std::auto_ptr<cms::MessageProducer> producer;
public:
AdvisoryProducer( cms::Session* session );
virtual ~AdvisoryProducer();
/**
* Shut down the processing that occurs in the Run method.
*/
void stop();
/**
* Run the producer code.
*/
virtual void run();
/**
* Async Message callback.
*/
virtual void onMessage( const cms::Message* message );
};
}}}
#endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_ */
AdvisoryProducer 來源檔案
#include "AdvisoryProducer.h"
#include <cms/Topic.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/Integer.h>
using namespace std;
using namespace activemqcpp;
using namespace activemqcpp::examples;
using namespace activemqcpp::examples::advisories;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
AdvisoryProducer::AdvisoryProducer( cms::Session* session ) : shutdownLatch(1) {
if( session == NULL ) {
throw NullPointerException(
__FILE__, __LINE__, "Session Object passed was Null." );
}
std::auto_ptr<cms::Topic> destination( session->createTopic(
"HEART-BEAT-CHANNEL" ) );
std::auto_ptr<cms::Topic> advisories( session->createTopic(
"ActiveMQ.Advisory.Consumer.Topic.HEART-BEAT-CHANNEL" ) );
this->shutdown = false;
this->consumerOnline = false;
this->session = session;
this->producer.reset( session->createProducer( destination.get() ) );
this->consumer.reset( session->createConsumer( advisories.get() ) );
this->consumer->setMessageListener( this );
}
////////////////////////////////////////////////////////////////////////////////
AdvisoryProducer::~AdvisoryProducer() {
}
////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::stop() {
this->shutdown = true;
this->shutdownLatch.await( 3000 );
}
////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::run() {
while( !this->shutdown ) {
if( this->consumerOnline ) {
std::auto_ptr<cms::TextMessage> message(
this->session->createTextMessage( "Alive" ) );
this->producer->send( message.get() );
Thread::sleep( 1000 );
}
}
this->shutdownLatch.countDown();
}
////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::onMessage( const cms::Message* message ) {
if( message->getCMSType() == "Advisory" ) {
std::cout << "Received an Advisory Message!" << std::endl;
if( message->propertyExists( "consumerCount" ) ) {
std::string consumerCount = message->getStringProperty( "consumerCount" );
std::cout << "Number of Consumers = " << consumerCount << std::endl;
this->consumerOnline = Integer::parseInt( consumerCount ) > 0 ? true : false;
}
} else {
std::cout << "Received a Non-Advisory Message!" << std::endl;
}
}
進階主題處理諮詢中的命令物件
如果您閱讀本教學開始時提到的 ActiveMQ 諮詢訊息文章,那麼您就會知道某些諮詢訊息可能包含內嵌的命令物件。如果您沒有閱讀那篇文章,那麼本節將會非常令人困惑,所以請先閱讀它。我們可以使用一些方法在 CMS 中存取這些命令物件,這表示我們可以充分利用諮詢訊息功能。
所有諮詢訊息都會以基本 ActiveMQMessage 的形式傳送到您的用戶端。ActiveMQ-CPP 中的基礎類型階層與 ActiveMQ 相同,因此您在諮詢文章中看到的內嵌命令物件的名稱是相同的,而且它們大多包含相同的資訊,雖然有時候資訊的編碼方式更符合 C++ 的風格,或者不符合 C++ 的風格,這取決於您的觀點。
為了示範如何存取命令物件,讓我們嘗試建立一個用戶端應用程式,該應用程式會監聽代理程式的諮詢訊息,這些諮詢訊息表示已建立或銷毀暫時目的地。每當建立或銷毀相應的暫時目的地時,代理程式都會將諮詢訊息發佈到「ActiveMQ.Advisory.TempTopic」和「ActiveMQ.Advisory.TempQueue」主題,而命令物件的類型將為 DestinationInfo。DestinationInfo 物件包含一個描述相關目的地的 Destination 物件,以及一個 Operation Type 值,指示該命令是建立命令還是銷毀命令。首先,讓我們看看如何訂閱此諮詢主題
訂閱複合諮詢主題
std::auto_ptr<cms::Topic> advisories( session->createTopic(
"ActiveMQ.Advisory.TempTopic,ActiveMQ.Advisory.TempQueue" ) );
std::auto_ptr<cms::MessageConsumer> consumer;
consumer.reset( session->createConsumer( advisories.get() ) );
consumer->setMessageListener( this );
如您在上面的程式碼片段中所見,我們只是建立一個新的主題物件,其名稱是我們想要訂閱的兩個主題的組合,這會允許我們單一的 MessageConsumer 執行個體接收暫時主題和暫時佇列諮詢。和以前一樣,我們也會建立一個 MessageConsumer,並將我們類別的執行個體註冊為非同步監聽器。現在剩下要做的就是實作 MessageListener 介面的 onMessage 方法,讓我們現在看看該程式碼
處理具有內嵌命令物件的諮詢訊息
////////////////////////////////////////////////////////////////////////////////
void TempDestinationAdvisoryConsumer::onMessage( const cms::Message* message ) {
if( message->getCMSType() == "Advisory" ) {
std::cout << "Received an Advisory Message!" << std::endl;
const ActiveMQMessage* amqMessage =
dynamic_cast<const ActiveMQMessage*>( message );
if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
std::cout << "Advisory Message contains a Command Object!" << std::endl;
try {
Pointer<DestinationInfo> info =
amqMessage->getDataStructure().dynamicCast<DestinationInfo>();
unsigned char operationType = info->getOperationType();
if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
std::cout << "Temporary Destination {"
<< info->getDestination()->getPhysicalName()
<< "} Removed."
<< std::endl;
} else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
std::cout << "Temporary Destination {"
<< info->getDestination()->getPhysicalName()
<< "} Added."
<< std::endl;
} else {
std::cout << "ERROR: I have no Idea what just happened!"
<< std::endl;
}
} catch( ClassCastException& ex ) {
std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
<< "it wasn't so PANIC!!"
<< std::endl;
}
}
} else {
std::cout << "Received a Non-Advisory Message!" << std::endl;
}
}
幸好上述程式碼看起來比實際情況更複雜,讓我們現在更仔細地逐步了解,以了解正在發生的事情
取得 ActiveMQMessage 物件
if( message->getCMSType() == "Advisory" ) {
std::cout << "Received an Advisory Message!" << std::endl;
const ActiveMQMessage* amqMessage =
dynamic_cast<const ActiveMQMessage*>( message );
... Other scary code comes next...
else {
std::cout << "Received a Non-Advisory Message!" << std::endl;
}
我們需要做的第一件事是檢查我們是否收到諮詢訊息,ActiveMQ 會將訊息類型編碼為「Advisory」,這很容易。我們在這裡技術上不需要這樣做,因為我們的消費者只監聽諮詢主題,但檢查一下不是個壞主意。一旦我們知道它是諮詢訊息,我們就知道該訊息指標應該是 ActiveMQMessage 類型,它偽裝成通用的 cms::Message,因此我們使用 dynamic_cast 來轉換它。現在我們已經轉換為 ActiveMQMessage,接下來是什麼?讓我們來看看
檢查是否有內嵌的命令物件
if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
std::cout << "Advisory Message contains a Command Object!" << std::endl;
每個 ActiveMQMessage 衍生物件都有一個名為 getDataStructure 的方法,可以用於各種有用的事情,在這裡,我們嘗試查看此訊息中是否包含命令物件,而且您猜對了,getDataStructure 方法會告訴我們是否有。如果有,那麼我們可以繼續檢查是否有 DestinationInfo 物件
取得 DestinationInfo 物件
try {
Pointer<DestinationInfo> info =
amqMessage->getDataStructure().dynamicCast<DestinationInfo>();
unsigned char operationType = info->getOperationType();
if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
std::cout << "Temporary Destination {"
<< info->getDestination()->getPhysicalName()
<< "} Removed."
<< std::endl;
} else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
std::cout << "Temporary Destination {"
<< info->getDestination()->getPhysicalName()
<< "} Added."
<< std::endl;
} else {
std::cout << "ERROR: I have no Idea what just happened!"
<< std::endl;
}
} catch( ClassCastException& ex ) {
std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
<< "it wasn't so PANIC!!"
<< std::endl;
}
您可能首先問這個程式碼片段的問題是「什麼是指標的東西?」,那是一個執行緒安全的智慧型指標,由 ActiveMQ-CPP 內部用來管理構成 cms::Message 物件的所有指標。我們建立一個指標的執行個體
完整範例:監聽臨時 Destination 的建立和銷毀的消費者
我們的客戶端應用程式的完整程式碼如下所示,您也可以在原始碼發行版本的 examples 資料夾中找到此程式碼以及一個建立臨時 Topic 和臨時 Queue 的簡單客戶端。
TempDestinationAdvisoryConsumer 標頭檔
#ifndef _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_
#define _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_
#include <string>
#include <memory>
#include <cms/Session.h>
#include <cms/MessageProducer.h>
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>
#include <decaf/lang/Runnable.h>
namespace activemqcpp {
namespace examples {
namespace advisories {
/**
* Monitors a Broker for Temporary Topic creation and destruction.
*
* @since 3.0
*/
class TempDestinationAdvisoryConsumer : public cms::MessageListener {
private:
cms::Session* session;
std::auto_ptr<cms::MessageConsumer> consumer;
public:
TempDestinationAdvisoryConsumer( cms::Session* session );
virtual ~TempDestinationAdvisoryConsumer();
/**
* Async Message callback.
*/
virtual void onMessage( const cms::Message* message );
};
}}}
#endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_ */
TempDestinationAdvisoryConsumer 原始碼檔案
#include "TempDestinationAdvisoryConsumer.h"
#include <cms/Topic.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/commands/ActiveMQMessage.h>
#include <activemq/commands/DestinationInfo.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/ClassCastException.h>
#include <decaf/lang/Integer.h>
using namespace std;
using namespace activemqcpp;
using namespace activemqcpp::examples;
using namespace activemqcpp::examples::advisories;
using namespace activemq;
using namespace activemq::commands;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
TempDestinationAdvisoryConsumer::TempDestinationAdvisoryConsumer( cms::Session* session ) {
if( session == NULL ) {
throw NullPointerException(
__FILE__, __LINE__, "Session Object passed was Null." );
}
std::auto_ptr<cms::Topic> advisories( session->createTopic(
"ActiveMQ.Advisory.TempTopic,ActiveMQ.Advisory.TempQueue" ) );
this->session = session;
this->consumer.reset( session->createConsumer( advisories.get() ) );
this->consumer->setMessageListener( this );
}
////////////////////////////////////////////////////////////////////////////////
TempDestinationAdvisoryConsumer::~TempDestinationAdvisoryConsumer() {
}
////////////////////////////////////////////////////////////////////////////////
void TempDestinationAdvisoryConsumer::onMessage( const cms::Message* message ) {
if( message->getCMSType() == "Advisory" ) {
std::cout << "Received an Advisory Message!" << std::endl;
const ActiveMQMessage* amqMessage =
dynamic_cast<const ActiveMQMessage*>( message );
if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
std::cout << "Advisory Message contains a Command Object!" << std::endl;
try {
Pointer<DestinationInfo> info =
amqMessage->getDataStructure().dynamicCast<DestinationInfo>();
unsigned char operationType = info->getOperationType();
if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
std::cout << "Temporary Destination {"
<< info->getDestination()->getPhysicalName()
<< "} Removed."
<< std::endl;
} else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
std::cout << "Temporary Destination {"
<< info->getDestination()->getPhysicalName()
<< "} Added."
<< std::endl;
} else {
std::cout << "ERROR: I have no Idea what just happened!"
<< std::endl;
}
} catch( ClassCastException& ex ) {
std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
<< "it wasn't so PANIC!!"
<< std::endl;
}
}
} else {
std::cout << "Received a Non-Advisory Message!" << std::endl;
}
}