我應該如何使用 JMS 實作請求/回應?

 常見問題 > JMS > 我應該如何使用 JMS 實作請求/回應?

我應該如何使用 JMS 實作請求/回應?

最簡單的解決方案是使用 Camel 作為 Spring Remoting 提供者,這可讓您將所有 JMS API 從您的商業邏輯中隱藏起來,並讓 Camel 為您提供請求/回應處理程式碼。

但是,如果您希望自行編寫 JMS 用戶端程式碼,請繼續閱讀其運作方式…

使用 JMS API 實作請求-回應

您一開始可能會認為,要在 JMS 中實作請求-回應類型的操作,您應該為每個請求建立一個帶有選擇器的新消費者;或者可能為每個請求建立一個新的暫時佇列。

建立暫時目的地、消費者、生產者和連線都是與代理程式進行的同步請求-回應操作,因此應避免為處理每個請求而執行這些操作,因為這會導致與 JMS 代理程式進行大量交談。

透過 JMS 實作請求-回應的最佳方式是在啟動時為每個用戶端建立一個暫時佇列和消費者,在每個訊息上將 JMSReplyTo 屬性設定為暫時佇列,然後在每個訊息上使用 關聯 ID,將請求訊息與回應訊息關聯起來。這避免了為每個請求建立和關閉消費者的開銷(這很耗費資源)。這也意味著,如果您想要(或可能將它們集區化),您可以在許多執行緒之間共用相同的生產者和消費者。

Lingo 程式庫是使用 JMS 的 Spring remoting 的實作。(Spring remoting 是一種基於 POJO 的 remoting,其中 remoting 程式碼對您的商業邏輯程式碼不可見)。

它完全使用這種模式;即使用關聯 ID 將請求與回應關聯起來。伺服器端只需記住將傳入訊息的關聯 ID 放在回應上即可。

執行此操作的實際類別是 MultiplexingRequestor。也許使用 Spring remoting 和 Lingo 是實作請求-回應的最簡單方法 - 或者您也可以只使用 Lingo 的 Requestor 介面來保留 JMS 語意。

更多詳細資訊請參閱此處

用戶端

因此,用戶端會在暫時佇列上建立一個消費者,如下所示…

// client side
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);
...

// send a request..
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID(myCorrelationID);

producer.send(message);

伺服器端

public void onMessage(Message request) {

  Message response = session.createMessage();
  response.setJMSCorrelationID(request.getJMSCorrelationID())

  producer.send(request.getJMSReplyTo(), response)
}

完整範例

伺服器端

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Server implements MessageListener {
    private static int ackMode;
    private static String messageQueueName;
    private static String messageBrokerUrl;

    private Session session;
    private boolean transacted = false;
    private MessageProducer replyProducer;
    private MessageProtocol messageProtocol;

    static {
        messageBrokerUrl = "tcp://127.0.0.1:61616";
        messageQueueName = "client.messages";
        ackMode = Session.AUTO_ACKNOWLEDGE;
    }

    public Server() {
        try {
            //This message broker is embedded
            BrokerService broker = new BrokerService();
            broker.setPersistent(false);
            broker.setUseJmx(false);
            broker.addConnector(messageBrokerUrl);
            broker.start();
        } catch (Exception e) {
            //Handle the exception appropriately
        }

        //Delegating the handling of messages to another class, instantiate it before setting up JMS so it
        //is ready to handle messages
        this.messageProtocol = new MessageProtocol();
        this.setupMessageQueueConsumer();
    }

    private void setupMessageQueueConsumer() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            this.session = connection.createSession(this.transacted, ackMode);
            Destination adminQueue = this.session.createQueue(messageQueueName);

            //Setup a message producer to respond to messages from clients, we will get the destination
            //to send to from the JMSReplyTo header field from a Message
            this.replyProducer = this.session.createProducer(null);
            this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //Set up a consumer to consume messages off of the admin queue
            MessageConsumer consumer = this.session.createConsumer(adminQueue);
            consumer.setMessageListener(this);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public void onMessage(Message message) {
        try {
            TextMessage response = this.session.createTextMessage();
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String messageText = txtMsg.getText();
                response.setText(this.messageProtocol.handleProtocolMessage(messageText));
            }

            //Set the correlation ID from the received message to be the correlation id of the response message
            //this lets the client identify which message this is a response to if it has more than
            //one outstanding message to the server
            response.setJMSCorrelationID(message.getJMSCorrelationID());

            //Send the response to the Destination specified by the JMSReplyTo field of the received message,
            //this is presumably a temporary queue created by the client
            this.replyProducer.send(message.getJMSReplyTo(), response);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public static void main(String\[\] args) {
        new Server();
    }
}

用戶端

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.Random;

public class Client implements MessageListener {
    private static int ackMode;
    private static String clientQueueName;

    private boolean transacted = false;
    private MessageProducer producer;

    static {
        clientQueueName = "client.messages";
        ackMode = Session.AUTO_ACKNOWLEDGE;
    }

    public Client() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(transacted, ackMode);
            Destination adminQueue = session.createQueue(clientQueueName);

            //Setup a message producer to send message to the queue the server is consuming from
            this.producer = session.createProducer(adminQueue);
            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //Create a temporary queue that this client will listen for responses on then create a consumer
            //that consumes message from this temporary queue...for a real application a client should reuse
            //the same temp queue for each message to the server...one temp queue per client
            Destination tempDest = session.createTemporaryQueue();
            MessageConsumer responseConsumer = session.createConsumer(tempDest);

            //This class will handle the messages to the temp queue as well
            responseConsumer.setMessageListener(this);

            //Now create the actual message you want to send
            TextMessage txtMessage = session.createTextMessage();
            txtMessage.setText("MyProtocolMessage");

            //Set the reply to field to the temp queue you created above, this is the queue the server
            //will respond to
            txtMessage.setJMSReplyTo(tempDest);

            //Set a correlation ID so when you get a response you know which sent message the response is for
            //If there is never more than one outstanding message to the server then the
            //same correlation ID can be used for all the messages...if there is more than one outstanding
            //message to the server you would presumably want to associate the correlation ID with this
            //message somehow...a Map works good
            String correlationId = this.createRandomString();
            txtMessage.setJMSCorrelationID(correlationId);
            this.producer.send(txtMessage);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    private String createRandomString() {
        Random random = new Random(System.currentTimeMillis());
        long randomLong = random.nextLong();
        return Long.toHexString(randomLong);
    }

    public void onMessage(Message message) {
        String messageText = null;
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                messageText = textMessage.getText();
                System.out.println("messageText = " + messageText);
            }
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public static void main(String\[\] args) {
        new Client();
    }
}

協定類別

需要此類別來執行上述用戶端/伺服器範例。將訊息處理委派給單獨的類別完全是個人偏好。

public class MessageProtocol {
    public String handleProtocolMessage(String messageText) {
        String responseText;
        if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
            responseText = "I recognize your protocol message";
        } else {
            responseText = "Unknown protocol message: " + messageText;
        }
        
        return responseText;
    }
}

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛標誌和 Apache ActiveMQ 專案標誌是 The Apache Software Foundation 的商標。 版權所有 © 2024,The Apache Software Foundation。 根據Apache 授權 2.0 授權。