Hi All,

I have a fanout exchange and I am binding different queues to this exchange.
When I send data to this exchange each queue that is bound to this exchange
is getting data as expected. But when I try to connect multiple consumers to
one of these fanned out queues, I get the following message:
"Queue message_queue1 has an exclusive consumer. No more consumers allowed."

Here is the properties file that has the configuration:

fanout.properties:
--------------------
java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionfactory =
amqp://guest:guest@xxxxxxx/?brokerlist='tcp://xxxxx:5672'

# for producer
destination.fanoutQueue =
BURL:fanout://testcollector.fanout//message_queue?durable='false'&autodelete='true'&exclusive='false'

# for consumers
destination.fanoutQueue1 =
BURL:fanout://testcollector.fanout//message_queue1?durable='false'&autodelete='true'&exclusive='false'

destination.fanoutQueue2 =
BURL:fanout://testcollector.fanout//message_queue2?durable='false'&autodelete='true'&exclusive='false'

destination.fanoutQueue3 =
BURL:fanout://testcollector.fanout//message_queue3?durable='false'&autodelete='true'&exclusive='false'
-------------------------

I tried to connect two consumers to message_queue1 and I get the following
error:
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to
:org.apache.qpid.AMQException: ch=0 id=0
ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4,
commandCode=7, fieldIndex=0, description=resource-locked: Queue
message_queue1 has an exclusive consumer. No more consumers allowed.
(qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]

I checked the configuration on AMQP Server and it says that message_queue1
is a non-exclusive queue.
-bash-4.1$ qpid-stat -q
Queues
  queue                                  dur  autoDel  excl  msg   msgIn 
msgOut  bytes  bytesIn  bytesOut  cons  bind
  message_queue1                              Y                 1    22    
21      15    324      309         1     2


Here is the code I am using.

Producer.java: (queueName=fanoutQueue)
----------------
                Properties properties = new Properties();
                
properties.load(this.getClass().getResourceAsStream("fanout.properties"));

                //Create the initial context
                Context ctx = new InitialContext(properties);

                // look up destination and connection factory
                Destination destination = (Destination)ctx.lookup(queueName);
                ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
                
                Connection connection = conFac.createConnection();
                Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                
                MessageProducer messageProducer = 
session.createProducer(destination);
                TextMessage message;

                // Send a series of messages in a loop
                int i=0;
                while(true) {
                        message = session.createTextMessage("Hello world! "+i);
                    messageProducer.send(message, DeliveryMode.NON_PERSISTENT,  
 
Message.DEFAULT_PRIORITY, 60*1000);
                        i++;
                        Thread.sleep(1000);
                }
-------------------

Consumer.java: (queueName=fanoutQueue1)
-----------------
                Properties properties = new Properties();
                
properties.load(this.getClass().getResourceAsStream("fanout.properties"));

                //Create the initial context
                Context ctx = new InitialContext(properties);

                // look up destination and connection factory
                Destination destination = (Destination)ctx.lookup(queueName);
                ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
                
                Connection connection = conFac.createConnection();
                connection.setExceptionListener(new ExceptionListener()
                {
                    public void onException(JMSException jmse)
                    {
                        System.err.println(CLASS + ": The sample received an 
exception
through the ExceptionListener");
                        System.exit(0);
                    }
                });

                System.out.println(CLASS + ": Creating a non-transacted, 
auto-acknowledged
session");
                Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                
                MessageConsumer messageConsumer = 
session.createConsumer(destination);
                connection.start();
                
                while(true) {
                        TextMessage message = 
(TextMessage)messageConsumer.receive(1000);
                        if(message != null)
                                System.out.println(message.getText());
                }
-----------------

Am I missing something here?

I really appreciate your help.

Thanks,
Uday


--
View this message in context: 
http://apache-qpid-users.2158936.n2.nabble.com/Unable-to-connect-multiple-consumers-to-a-queue-on-fan-out-exchange-tp6585677p6585677.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to