Apache ActiveMQ ™ -- 處理諮詢訊息

ActiveMQ 支援諮詢訊息,讓您可以使用一般的 CMS 訊息來監看系統。目前您可以使用諮詢訊息來做幾件事:

  • 查看消費者、生產者和連線的啟動和停止
  • 查看暫時目的地的建立和銷毀
  • 取得主題和佇列上過期訊息的通知
  • 觀察代理程式將訊息傳送到沒有消費者的目的地。
  • 查看連線的啟動和停止

諮詢訊息可以被視為某種管理通道,您可以在其中收到有關 JMS 提供者上發生的事情,以及生產者、消費者和目的地發生的事情的資訊。若要深入瞭解代理程式的諮詢訊息支援,請參閱這篇文章

開始之前

本教學假設讀者對 CMS API 有基本的了解,並且知道如何使用 ActiveMQ-CPP 程式庫建構基本應用程式。如果您不確定如何使用 CMS API,那麼您應該先閱讀CMS API 概觀。本教學是針對 ActiveMQ-CPP 3.0 發行版本的 API 所撰寫,雖然可以使用舊的 ActiveMQ-CPP 2.x 用戶端來處理諮詢訊息,但可能會有部分程式碼差異,本文中不會顯示。

訂閱諮詢主題

要接收諮詢訊息,您首先需要訂閱提供所需諮詢訊息的主題。您可以像訂閱任何其他目的地一樣訂閱這些主題,訣竅是使用您想要使用之主題的正確名稱。讓我們先來看幾個可用的類型(這不是完整集合)。

諮詢主題 描述 屬性 資料結構
ActiveMQ.Advisory.Connection 連線啟動和停止訊息    
ActiveMQ.Advisory.Producer.Queue 佇列上生產者啟動和停止訊息 字串=’producerCount’ - 生產者數量 ProducerInfo
ActiveMQ.Advisory.Producer.Topic 主題上生產者啟動和停止訊息 字串=’producerCount’ - 生產者數量 ProducerInfo
ActiveMQ.Advisory.Consumer.Queue 佇列上消費者啟動和停止訊息 字串=’consumerCount’ - 消費者數量 ConsumerInfo
ActiveMQ.Advisory.Consumer.Topic 主題上消費者啟動和停止訊息 字串=’consumerCount’ - 消費者數量 ConsumerInfo

現在,看看上面的清單,讓我們選擇一個主題,並找出如何建立 CMS 主題來訂閱,以便接收諮詢訊息。使用 Java 用戶端,我們可以使用公用程式類別 AdvisorySupport 來建立目的地,但目前 ActiveMQ-CPP 沒有提供此類公用程式類別,希望我們能在未來的版本中看到新增(當然,隨時歡迎貢獻!)。

如果我們有一個名為 TOPIC.FOO 的主題,而且我們想知道何時有生產者訂閱該主題,我們需要建立一個主題物件,其名稱為 ActiveMQ.Advisory.Producer.Topic.TOPIC.FOO,才能接收我們感興趣的諮詢訊息。我們知道這一點,因為我們可以查看上面的表格,並看到只要生產者開始或停止在主題上發布訊息,就會通知 ActiveMQ.Advisory.Producer.Topic,而且我們也知道我們的主題名為 TOPIC.FOO,因此將它們加在一起會得到我們諮詢主題的名稱,我們也知道這一點,因為我們看過 AdvisorySupport.java 類別,而且,這不算作弊。以下是使用 CMS 會話建立主題的程式碼片段:

std::auto_ptr<cms::Topic> advisories( session->createTopic(
    "ActiveMQ.Advisory.Producer.Topic.TOPIC.FOO" ) );

一旦我們為想要監聽的諮詢訊息建立主題,我們只需要建立一個消費者來監聽它們,下面的程式碼片段示範了這一點

std::auto_ptr<cms::MessageConsumer> consumer;
consumer.reset( session->createConsumer( advisories.get() ) );
consumer->setMessageListener( this );

如您所見,訂閱諮詢主題和訂閱 CMS 中任何其他目的地之間沒有區別。在上面的範例中,我們註冊為非同步監聽器,您也可以使用一般的阻塞式 receive 方法,但我們更喜歡這種方法。

處理傳入的諮詢訊息

每個諮詢的訊息類型都是 'Advisory',並且有一些預定義的訊息屬性,要檢查 CMS 訊息是否為這種類型,您會在訊息物件上呼叫 getCMSType 方法。在某些情況下,您會知道您唯一會收到的訊息是諮詢訊息,因為您的用戶端只訂閱諮詢主題,其他時候您可能會將多個 MessageConsumer 連線到同一個 MessageListener,在這種情況下,您必須檢查訊息類型。一旦您知道您正在處理諮詢訊息,那麼您就可以開始檢查它,以確定它是什麼類型的訊息,並提取對您的應用程式有意義的資料。

許多諮詢訊息將有意義的資料儲存在訊息屬性中,例如,消費者啟動/停止諮詢訊息包含一個以 consumerCount 為鍵值的元素,該元素會填入主題或佇列上目前作用中的消費者數量。讓我們看看一個程式碼片段,該程式碼片段會檢查 onMessage 回呼中接收到的訊息,以查看是否為諮詢訊息,如果是,則會對其採取行動

void AdvisoryProducer::onMessage( const cms::Message* message ) {

   if( message->getCMSType() == "Advisory" ) {

       std::cout << "Received an Advisory Message!" << std::endl;

       if( message->propertyExists( "consumerCount" ) ) {

           std::string consumerCount = message->getStringProperty( "consumerCount" );
           std::cout << "Number of Consumers = " << consumerCount << std::endl;

           // Do Something Meaningful here....
       }

   } else {
       std::cout << "Received a Non-Advisory Message!" << std::endl;
   }
}

完整範例:只有在有消費者時才會產生訊息的生產者

現在您已經了解諮詢訊息處理的基本知識,是時候向您展示一個完整的範例,示範如何使用諮詢訊息。下列程式碼顯示一個類別標頭和來源檔案,該類別標頭和來源檔案實作了一個基本的 CMS 生產者,該生產者只有在有作用中的消費者時,才會將心跳訊息傳送到名為 HEART-BEAT-CHANNEL 的主題,否則會保持閒置。

#ifndef _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_
#define _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_

#include <string>
#include <memory>

#include <cms/Session.h>
#include <cms/MessageProducer.h>
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>

#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>

namespace activemqcpp {
namespace examples {
namespace advisories {

    /**
     * A sample Producer that will only send Messages on its Topic when it has
     * received an advisory indicating that there is an active MessageConsumer
     * on the Topic.  Once another message comes in indicating that there is no
     * longer a consumer then this Producer stops producing again.
     *
     * @since 3.0
     */
    class AdvisoryProducer : public decaf::lang::Runnable,
                             public cms::MessageListener {
    private:

        volatile bool consumerOnline;
        volatile bool shutdown;
        decaf::util::concurrent::CountDownLatch shutdownLatch;

        cms::Session* session;
        std::auto_ptr<cms::MessageConsumer> consumer;
        std::auto_ptr<cms::MessageProducer> producer;

    public:

        AdvisoryProducer( cms::Session* session );
        virtual ~AdvisoryProducer();

        /**
         * Shut down the processing that occurs in the Run method.
         */
        void stop();

        /**
         * Run the producer code.
         */
        virtual void run();

        /**
         * Async Message callback.
         */
        virtual void onMessage( const cms::Message* message );

    };

}}}

#endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_ */

AdvisoryProducer 來源檔案

#include "AdvisoryProducer.h"

#include <cms/Topic.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/Integer.h>

using namespace std;
using namespace activemqcpp;
using namespace activemqcpp::examples;
using namespace activemqcpp::examples::advisories;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;

////////////////////////////////////////////////////////////////////////////////
AdvisoryProducer::AdvisoryProducer( cms::Session* session ) : shutdownLatch(1) {

    if( session == NULL ) {
        throw NullPointerException(
            __FILE__, __LINE__, "Session Object passed was Null." );
    }

    std::auto_ptr<cms::Topic> destination( session->createTopic(
        "HEART-BEAT-CHANNEL" ) );
    std::auto_ptr<cms::Topic> advisories( session->createTopic(
        "ActiveMQ.Advisory.Consumer.Topic.HEART-BEAT-CHANNEL" ) );

    this->shutdown = false;
    this->consumerOnline = false;

    this->session = session;
    this->producer.reset( session->createProducer( destination.get() ) );
    this->consumer.reset( session->createConsumer( advisories.get() ) );
    this->consumer->setMessageListener( this );
}

////////////////////////////////////////////////////////////////////////////////
AdvisoryProducer::~AdvisoryProducer() {
}

////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::stop() {
    this->shutdown = true;
    this->shutdownLatch.await( 3000 );
}

////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::run() {

    while( !this->shutdown ) {

        if( this->consumerOnline ) {

            std::auto_ptr<cms::TextMessage> message(
                this->session->createTextMessage( "Alive" ) );

            this->producer->send( message.get() );

            Thread::sleep( 1000 );
        }
    }

    this->shutdownLatch.countDown();
}

////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::onMessage( const cms::Message* message ) {

    if( message->getCMSType() == "Advisory" ) {

        std::cout << "Received an Advisory Message!" << std::endl;

        if( message->propertyExists( "consumerCount" ) ) {

            std::string consumerCount = message->getStringProperty( "consumerCount" );
            std::cout << "Number of Consumers = " << consumerCount << std::endl;
            this->consumerOnline = Integer::parseInt( consumerCount ) > 0 ? true : false;
        }

    } else {
        std::cout << "Received a Non-Advisory Message!" << std::endl;
    }
}

進階主題處理諮詢中的命令物件

如果您閱讀本教學開始時提到的 ActiveMQ 諮詢訊息文章,那麼您就會知道某些諮詢訊息可能包含內嵌的命令物件。如果您沒有閱讀那篇文章,那麼本節將會非常令人困惑,所以請先閱讀它。我們可以使用一些方法在 CMS 中存取這些命令物件,這表示我們可以充分利用諮詢訊息功能。

所有諮詢訊息都會以基本 ActiveMQMessage 的形式傳送到您的用戶端。ActiveMQ-CPP 中的基礎類型階層與 ActiveMQ 相同,因此您在諮詢文章中看到的內嵌命令物件的名稱是相同的,而且它們大多包含相同的資訊,雖然有時候資訊的編碼方式更符合 C++ 的風格,或者不符合 C++ 的風格,這取決於您的觀點。

為了示範如何存取命令物件,讓我們嘗試建立一個用戶端應用程式,該應用程式會監聽代理程式的諮詢訊息,這些諮詢訊息表示已建立或銷毀暫時目的地。每當建立或銷毀相應的暫時目的地時,代理程式都會將諮詢訊息發佈到「ActiveMQ.Advisory.TempTopic」和「ActiveMQ.Advisory.TempQueue」主題,而命令物件的類型將為 DestinationInfo。DestinationInfo 物件包含一個描述相關目的地的 Destination 物件,以及一個 Operation Type 值,指示該命令是建立命令還是銷毀命令。首先,讓我們看看如何訂閱此諮詢主題

訂閱複合諮詢主題

std::auto_ptr<cms::Topic> advisories( session->createTopic(
    "ActiveMQ.Advisory.TempTopic,ActiveMQ.Advisory.TempQueue" ) );

std::auto_ptr<cms::MessageConsumer> consumer;
consumer.reset( session->createConsumer( advisories.get() ) );
consumer->setMessageListener( this );

如您在上面的程式碼片段中所見,我們只是建立一個新的主題物件,其名稱是我們想要訂閱的兩個主題的組合,這會允許我們單一的 MessageConsumer 執行個體接收暫時主題和暫時佇列諮詢。和以前一樣,我們也會建立一個 MessageConsumer,並將我們類別的執行個體註冊為非同步監聽器。現在剩下要做的就是實作 MessageListener 介面的 onMessage 方法,讓我們現在看看該程式碼

處理具有內嵌命令物件的諮詢訊息

////////////////////////////////////////////////////////////////////////////////
void TempDestinationAdvisoryConsumer::onMessage( const cms::Message* message ) {

    if( message->getCMSType() == "Advisory" ) {

        std::cout << "Received an Advisory Message!" << std::endl;

        const ActiveMQMessage* amqMessage =
            dynamic_cast<const ActiveMQMessage*>( message );

        if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
            std::cout << "Advisory Message contains a Command Object!" << std::endl;

            try {

                Pointer<DestinationInfo> info =
                    amqMessage->getDataStructure().dynamicCast<DestinationInfo>();

                unsigned char operationType = info->getOperationType();

                if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
                    std::cout << "Temporary Destination {"
                              << info->getDestination()->getPhysicalName()
                              << "} Removed."
                              << std::endl;
                } else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
                    std::cout << "Temporary Destination {"
                              << info->getDestination()->getPhysicalName()
                              << "} Added."
                              << std::endl;
                } else {
                    std::cout << "ERROR: I have no Idea what just happened!"
                              << std::endl;
                }

            } catch( ClassCastException& ex ) {
                std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
                          << "it wasn't so PANIC!!"
                          << std::endl;
            }
        }

    } else {
        std::cout << "Received a Non-Advisory Message!" << std::endl;
    }
}

幸好上述程式碼看起來比實際情況更複雜,讓我們現在更仔細地逐步了解,以了解正在發生的事情

取得 ActiveMQMessage 物件

if( message->getCMSType() == "Advisory" ) {

    std::cout << "Received an Advisory Message!" << std::endl;

    const ActiveMQMessage* amqMessage =
        dynamic_cast<const ActiveMQMessage*>( message );

    ... Other scary code comes next...

else {
    std::cout << "Received a Non-Advisory Message!" << std::endl;
}

我們需要做的第一件事是檢查我們是否收到諮詢訊息,ActiveMQ 會將訊息類型編碼為「Advisory」,這很容易。我們在這裡技術上不需要這樣做,因為我們的消費者只監聽諮詢主題,但檢查一下不是個壞主意。一旦我們知道它是諮詢訊息,我們就知道該訊息指標應該是 ActiveMQMessage 類型,它偽裝成通用的 cms::Message,因此我們使用 dynamic_cast 來轉換它。現在我們已經轉換為 ActiveMQMessage,接下來是什麼?讓我們來看看

檢查是否有內嵌的命令物件

if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
    std::cout << "Advisory Message contains a Command Object!" << std::endl;

每個 ActiveMQMessage 衍生物件都有一個名為 getDataStructure 的方法,可以用於各種有用的事情,在這裡,我們嘗試查看此訊息中是否包含命令物件,而且您猜對了,getDataStructure 方法會告訴我們是否有。如果有,那麼我們可以繼續檢查是否有 DestinationInfo 物件

取得 DestinationInfo 物件

try {

    Pointer<DestinationInfo> info =
        amqMessage->getDataStructure().dynamicCast<DestinationInfo>();

    unsigned char operationType = info->getOperationType();

    if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
        std::cout << "Temporary Destination {"
                  << info->getDestination()->getPhysicalName()
                  << "} Removed."
                  << std::endl;
    } else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
        std::cout << "Temporary Destination {"
                  << info->getDestination()->getPhysicalName()
                  << "} Added."
                  << std::endl;
     } else {
        std::cout << "ERROR: I have no Idea what just happened!"
                  << std::endl;
     }

} catch( ClassCastException& ex ) {
    std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
              << "it wasn't so PANIC!!"
              << std::endl;
}

您可能首先問這個程式碼片段的問題是「什麼是指標的東西?」,那是一個執行緒安全的智慧型指標,由 ActiveMQ-CPP 內部用來管理構成 cms::Message 物件的所有指標。我們建立一個指標的執行個體如果 `dynamicCast` 方法能夠進行轉換,則此類型將會是指向我們的 `DestinationInfo` 命令的指標;如果無法轉換,則會拋出 `ClassCastException` 異常。一旦我們取得 `DestinationInfo` 指標,我們就可以檢索命令的操作類型,然後將其與 `ActiveMQConstants` 中的常數進行比較,以了解對 Destination 執行的操作。只有兩種操作類型,即新增和移除,但是由於 `DestinationInfo` 物件將操作類型值編碼為 unsigned char,因此我們提供了一個回退情況,以便提醒我們出現該錯誤。我們現在幾乎完成了,剩下的就是輸出發生了什麼,並讓使用者知道 Destination 的名稱,`Destination` 類別中的 **getPhysicalName** 方法會告訴我們。您也可以使用 Destination 物件找出 Destination 是 Topic 還是 Queue,我們將其留給讀者作為練習。

完整範例:監聽臨時 Destination 的建立和銷毀的消費者

我們的客戶端應用程式的完整程式碼如下所示,您也可以在原始碼發行版本的 examples 資料夾中找到此程式碼以及一個建立臨時 Topic 和臨時 Queue 的簡單客戶端。

TempDestinationAdvisoryConsumer 標頭檔

#ifndef _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_
#define _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_

#include <string>
#include <memory>

#include <cms/Session.h>
#include <cms/MessageProducer.h>
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>

#include <decaf/lang/Runnable.h>

namespace activemqcpp {
namespace examples {
namespace advisories {

    /**
     * Monitors a Broker for Temporary Topic creation and destruction.
     *
     * @since 3.0
     */
    class TempDestinationAdvisoryConsumer : public cms::MessageListener {
    private:

        cms::Session* session;
        std::auto_ptr<cms::MessageConsumer> consumer;

    public:

        TempDestinationAdvisoryConsumer( cms::Session* session );
        virtual ~TempDestinationAdvisoryConsumer();

        /**
         * Async Message callback.
         */
        virtual void onMessage( const cms::Message* message );

    };

}}}

#endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_ */

TempDestinationAdvisoryConsumer 原始碼檔案

#include "TempDestinationAdvisoryConsumer.h"

#include <cms/Topic.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/commands/ActiveMQMessage.h>
#include <activemq/commands/DestinationInfo.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/ClassCastException.h>
#include <decaf/lang/Integer.h>

using namespace std;
using namespace activemqcpp;
using namespace activemqcpp::examples;
using namespace activemqcpp::examples::advisories;
using namespace activemq;
using namespace activemq::commands;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;

////////////////////////////////////////////////////////////////////////////////
TempDestinationAdvisoryConsumer::TempDestinationAdvisoryConsumer( cms::Session* session ) {

    if( session == NULL ) {
        throw NullPointerException(
            __FILE__, __LINE__, "Session Object passed was Null." );
    }

    std::auto_ptr<cms::Topic> advisories( session->createTopic(
        "ActiveMQ.Advisory.TempTopic,ActiveMQ.Advisory.TempQueue" ) );

    this->session = session;
    this->consumer.reset( session->createConsumer( advisories.get() ) );
    this->consumer->setMessageListener( this );
}

////////////////////////////////////////////////////////////////////////////////
TempDestinationAdvisoryConsumer::~TempDestinationAdvisoryConsumer() {
}

////////////////////////////////////////////////////////////////////////////////
void TempDestinationAdvisoryConsumer::onMessage( const cms::Message* message ) {

    if( message->getCMSType() == "Advisory" ) {

        std::cout << "Received an Advisory Message!" << std::endl;

        const ActiveMQMessage* amqMessage =
            dynamic_cast<const ActiveMQMessage*>( message );

        if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
            std::cout << "Advisory Message contains a Command Object!" << std::endl;

            try {

                Pointer<DestinationInfo> info =
                    amqMessage->getDataStructure().dynamicCast<DestinationInfo>();

                unsigned char operationType = info->getOperationType();

                if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
                    std::cout << "Temporary Destination {"
                              << info->getDestination()->getPhysicalName()
                              << "} Removed."
                              << std::endl;
                } else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
                    std::cout << "Temporary Destination {"
                              << info->getDestination()->getPhysicalName()
                              << "} Added."
                              << std::endl;
                } else {
                    std::cout << "ERROR: I have no Idea what just happened!"
                              << std::endl;
                }

            } catch( ClassCastException& ex ) {
                std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
                          << "it wasn't so PANIC!!"
                          << std::endl;
            }
        }

    } else {
        std::cout << "Received a Non-Advisory Message!" << std::endl;
    }
}

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛標誌和 Apache ActiveMQ 專案標誌是 The Apache Software Foundation 的商標。版權所有 © 2024,The Apache Software Foundation。根據 Apache License 2.0 授權。