Publish-subscribe communication

The following diagram provides an overview of the scenario that we will implement:

For publish-subscribers we can use a fanout exchange and bind any number of queues to that exchange regardless of the binding key. The PublishSubscribeReceiver class can be used to bind a specified queue to a fanout exchange and receive messages from it:

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class PublishSubscribeReceiver {

    private final static String EXCHANGE_NAME = "pubsub_exchange";
    private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class);
    private Channel channel = null;
    private Connection connection = null;

    public void initialize() {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
    . . .
}

The receive() method can be used to retrieve a message from a queue that is bound to the pubsub_exchange fanout exchange and does the following:

  • Creates the pubsub_exchange, if not already created
  • Creates the specified queue if not already created
  • Binds the queue to the pubsub_exchange using the queueBind() method of the Channel instance that represents the AMQP channel for the receiver; notice that in this case we don't specify any particular binding key and for that reason we are using the empty string
  • Creates a new QueueingConsumer instance, registered using the AMQP channel, and the nextDelivery() method is called to receive a message from the channel:
        public String receive(String queue) {
    
            if (channel == null) {
                initialize();
            }
    
            String message = null;
            try {
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                channel.queueDeclare(queue, false, false, false, null);
                channel.queueBind(queue, EXCHANGE_NAME, "");
                QueueingConsumer consumer = new QueueingConsumer(channel);
                channel.basicConsume(queue, true, consumer);
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                message = new String(delivery.getBody());
                LOGGER.info("Message received: " + message);
                return message;
    
            } catch (IOException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (ShutdownSignalException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (ConsumerCancelledException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            return message;
        }

And we also have a destroy() method:

    public void destroy() {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (IOException e) {
            LOGGER.warn(e.getMessage(), e);
        }
    }
}

In order to demonstrate the usage of QueueingConsumer for establishing a publish-subscribe communication channel, we will use the FanoutExchangeSenderDemo class to send a message to the pubsub_exchange fanout exchange:

public class FanoutExchangeSenderDemo {

    private static final String FANOUT_EXCHANGE_TYPE = "fanout";

    public static void sendToFanoutExchange(String exchange) {
        Sender sender = new Sender();
        sender.initialize();
        sender.send(exchange, FANOUT_EXCHANGE_TYPE, "Test message.");
        sender.destroy();
    }

    public static void main(String[] args) {
        sendToFanoutExchange("pubsub_exchange");
    }
}

When you invoke the main() method of the FanoutExchangeSenderDemo class, you may notice from the management console that the pubsub_exchange exchange is created in the RabbitMQ server instance separate from the predefined exchanges:

If you restart the RabbitMQ instance then you will not see the pubsub_exchange from the management console again, because the exchange is not marked as durable. In order to mark a queue/exchange as durable, you can provide an additional parameter to the queueDeclare()/exchangeDeclare() methods of the Channel class. In order to provide further message delivery guarantees on the broker, you can use the publisher confirms of the extension.

The PublishSubscribeReceiverDemo class provides a demonstration of the PublishSubscribeReceiver class for the establishment of a publish-subscribe channel:

public class PublishSubscribeReceiverDemo {

    public static void main(String[] args) throws InterruptedException {
        final PublishSubscribeReceiver receiver1 = new PublishSubscribeReceiver();
        receiver1.initialize();
        final PublishSubscribeReceiver receiver2 = new PublishSubscribeReceiver();
        receiver2.initialize();
        Thread t1 = new Thread(new Runnable() {
            public void run() {
                receiver1.receive("pubsub_queue1");
            }
        });
        Thread t2 = new Thread(new Runnable() {
            public void run() {
                receiver2.receive("pubsub_queue2");
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        receiver1.destroy();
        receiver2.destroy();
    }
}

The main() method creates two receivers that bind to two different queues: pubsub_queue1 and pubsub_queue2. If you have already sent a message to the pubsub_exchange exchange, it will be delivered to both queues and thus sent to both consumers.