- Learning RabbitMQ
- Martin Toshev
- 640字
- 2025-02-24 02:09:16
Publish-subscribe communication
The following diagram provides an overview of the scenario that we will implement:
data:image/s3,"s3://crabby-images/4541d/4541dec4a0a9a4f02f2992d88e6ba5b7166ff220" alt=""
For publish-subscribers we can use a fanout exchange and bind any number of queues to that exchange regardless of the binding key. The PublishSubscribeReceiver
class can be used to bind a specified queue to a fanout exchange and receive messages from it:
import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class PublishSubscribeReceiver { private final static String EXCHANGE_NAME = "pubsub_exchange"; private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class); private Channel channel = null; private Connection connection = null; public void initialize() { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } } . . . }
The receive()
method can be used to retrieve a message from a queue that is bound to the pubsub_exchange
fanout exchange and does the following:
- Creates the
pubsub_exchange
, if not already created - Creates the specified queue if not already created
- Binds the queue to the
pubsub_exchange
using thequeueBind()
method of the Channel instance that represents the AMQP channel for the receiver; notice that in this case we don't specify any particular binding key and for that reason we are using the empty string - Creates a new
QueueingConsumer
instance, registered using the AMQP channel, and thenextDelivery()
method is called to receive a message from thechannel
:public String receive(String queue) { if (channel == null) { initialize(); } String message = null; try { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.queueDeclare(queue, false, false, false, null); channel.queueBind(queue, EXCHANGE_NAME, ""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); message = new String(delivery.getBody()); LOGGER.info("Message received: " + message); return message; } catch (IOException e) { LOGGER.error(e.getMessage(), e); } catch (ShutdownSignalException e) { LOGGER.error(e.getMessage(), e); } catch (ConsumerCancelledException e) { LOGGER.error(e.getMessage(), e); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } return message; }
And we also have a destroy()
method:
public void destroy() { try { if (connection != null) { connection.close(); } } catch (IOException e) { LOGGER.warn(e.getMessage(), e); } } }
In order to demonstrate the usage of QueueingConsumer
for establishing a publish-subscribe communication channel, we will use the FanoutExchangeSenderDemo
class to send a message to the pubsub_exchange
fanout exchange:
public class FanoutExchangeSenderDemo { private static final String FANOUT_EXCHANGE_TYPE = "fanout"; public static void sendToFanoutExchange(String exchange) { Sender sender = new Sender(); sender.initialize(); sender.send(exchange, FANOUT_EXCHANGE_TYPE, "Test message."); sender.destroy(); } public static void main(String[] args) { sendToFanoutExchange("pubsub_exchange"); } }
When you invoke the main()
method of the FanoutExchangeSenderDemo
class, you may notice from the management console that the pubsub_exchange
exchange is created in the RabbitMQ server instance separate from the predefined exchanges:
data:image/s3,"s3://crabby-images/df21e/df21e13c14a2c6222c1f9b86cf2718c1df6a7094" alt=""
If you restart the RabbitMQ instance then you will not see the pubsub_exchange
from the management console again, because the exchange is not marked as durable. In order to mark a queue/exchange as durable, you can provide an additional parameter to the queueDeclare()/exchangeDeclare()
methods of the Channel
class. In order to provide further message delivery guarantees on the broker, you can use the publisher confirms of the extension.
The PublishSubscribeReceiverDemo
class provides a demonstration of the PublishSubscribeReceiver
class for the establishment of a publish-subscribe channel:
public class PublishSubscribeReceiverDemo { public static void main(String[] args) throws InterruptedException { final PublishSubscribeReceiver receiver1 = new PublishSubscribeReceiver(); receiver1.initialize(); final PublishSubscribeReceiver receiver2 = new PublishSubscribeReceiver(); receiver2.initialize(); Thread t1 = new Thread(new Runnable() { public void run() { receiver1.receive("pubsub_queue1"); } }); Thread t2 = new Thread(new Runnable() { public void run() { receiver2.receive("pubsub_queue2"); } }); t1.start(); t2.start(); t1.join(); t2.join(); receiver1.destroy(); receiver2.destroy(); } }
The main()
method creates two receivers that bind to two different queues: pubsub_queue1
and pubsub_queue2
. If you have already sent a message to the pubsub_exchange
exchange, it will be delivered to both queues and thus sent to both consumers.