- Learning RabbitMQ
- Martin Toshev
- 697字
- 2025-02-24 02:09:16
Request-reply communication
The following diagram provides an overview of the scenario that we will implement:
data:image/s3,"s3://crabby-images/e2ea1/e2ea14d5cf7ebf216e88b68da8a1f1ed9ebf0820" alt=""
The sender will send a message to the default exchange with a routing key that matches the name of the designated request queue. The request receiver is a subscriber to the request queue. After a request message is received, the request receiver retrieves the value of the replyTo
property from the message header, creates a response message, and sends it to the default exchange with a routing key that matches the replyTo
property. This means that the replyTo
property points to a queue that handles response messages and the sender is subscribed to that queue in order to receive a response.
Let's extend our Sender
class with the following sendRequest()
method, which sends a message to the request_exchange
exchange, and the receiveResponse()
method, which receives a message from the response_queue
queue as follows:
private static final String REQUEST_QUEUE = "request_queue"; private static final String RESPONSE_QUEUE = "response_queue"; public void sendRequest(String requestQueue, String message, String correlationId) { try { channel.queueDeclare(REQUEST_QUEUE, false, false, false, null); channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null); AMQP.BasicProperties amqpProps = new AMQP.BasicProperties(); amqpProps = amqpProps.builder() .correlationId(String.valueOf(correlationId)) .replyTo(RESPONSE_QUEUE).build(); channel.basicPublish(DEFAULT_EXCHANGE, REQUEST_QUEUE, amqpProps, message.getBytes()); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } } public String waitForResponse(final String correlationId) { QueueingConsumer consumer = new QueueingConsumer(channel); String result = null; try { channel.basicConsume(RESPONSE_QUEUE, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(3000); String message = new String(delivery.getBody()); if (delivery.getProperties() != null) { String msgCorrelationId = delivery.getProperties() .getCorrelationId(); if (!correlationId.equals(msgCorrelationId)) { LOGGER.warn("Received response of another request."); } else { result = message; } } LOGGER.info("Message received: " + message); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } catch (ShutdownSignalException e) { LOGGER.error(e.getMessage(), e); } catch (ConsumerCancelledException e) { LOGGER.error(e.getMessage(), e); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } return result; }
The sendRequest()
method crafts an AMQP.BasicProperties
instance and provides the replyTo
and correlationId
properties. The correlationId
must be a unique identifier that is passed back in the response message and can be used by the sender to determine the request for which a response is received.
The RequestReceiver
class provides a sample implementation of a request receiver:
public class RequestReceiver { private static final String DEFAULT_QUEUE = ""; private static final String REQUEST_QUEUE = "request_queue"; private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class); private Connection connection = null; private Channel channel = null; public void initialize() { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } } . . . }
The receive()
method is used to read a request message from a queue:
public void receive() { if (channel == null) { initialize(); } String message = null; try { channel.queueDeclare(REQUEST_QUEUE, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(REQUEST_QUEUE, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); message = new String(delivery.getBody()); LOGGER.info("Request received: " + message); // do something with the request message ... BasicProperties properties = delivery.getProperties(); if (properties != null) { AMQP.BasicProperties amqpProps = new AMQP.BasicProperties(); amqpProps = amqpProps.builder().correlationId( String.valueOf(properties.getCorrelationId())).build(); channel.basicPublish(DEFAULT_QUEUE, properties.getReplyTo(), amqpProps, "Response message.".getBytes()); } else { LOGGER.warn("Cannot determine response destination for message."); } } catch (IOException e) { LOGGER.error(e.getMessage(), e); } catch (ShutdownSignalException e) { LOGGER.error(e.getMessage(), e); } catch (ConsumerCancelledException e) { LOGGER.error(e.getMessage(), e); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } }
And again we have a destroy()
method – it is important to make sure that you close your connections to the broker if you are no longer using them:
public void destroy() { if (connection != null) { try { connection.close(); } catch (IOException e) { LOGGER.warn(e.getMessage(), e); } } } }
In order to send a request message we can use the RequestSenderDemo
class:
public class RequestSenderDemo { private static final String REQUEST_QUEUE = "request_queue"; public static String sendToRequestReplyQueue() { Sender sender = new Sender(); sender.initialize(); sender.sendRequest(REQUEST_QUEUE, "Test message.", "MSG1"); String result = sender.waitForResponse("MSG1"); sender.destroy(); return result; } public static void main(String[] args) { sendToRequestReplyQueue(); } }
In order to receive the request message and send a response message, you can use the RequestReceiverDemo
class:
public class RequestReceiverDemo { public static void main(String[] args) throws InterruptedException { final RequestReceiver receiver = new RequestReceiver(); receiver.initialize(); receiver.receive(); receiver.destroy(); } }