I am implementing the quorum queue feature of rabbitmq.
Following is the route configuration for the implementation:-
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
from("timer:hello?period=1000")
.transform(simple("Random number ${random(0,100)}"))
.end()
.to("rabbitmq:test?addresses=172.17.0.2:5672, 172.17.0.3:5672,
172.17.0.4:5672, 172.17.0.5:5672, 172.17.0.6:5672
&connectionFactory=factory&exchangeType=direct&arg.queue.x-queue-type=quorum&autoDelete=false&queue=hello");
}
});
In the above case I have setup a rabbitmq cluster using docker consisting
of 5 nodes. Addresses option contain the list of the rabbitmq nodes ip and
port. As in the case of quorum queue leader and follower are formed but
when I am stopping the leader it is unable to bind to the other node in the
address pool.
According to doc of camel rabbitmq component it should bind to other
address in the pool but its not happening in the above scanerio. Below is
the attached error log file.
But the above setup is working perfect in the case of HA mirroring using
classic queue type and it is able to switch to other node in case of
failure.
Please provide an insight for the above issue.
Thank You
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 3.0.1
(CamelContext: camel-1) is starting
[main] INFO org.apache.camel.impl.engine.DefaultManagementStrategy - JMX is
disabled
[main] INFO org.apache.camel.impl.DefaultCamelContext - StreamCaching is not in
use. If using streams then its recommended to enable stream caching. See more
details at http://camel.apache.org/stream-caching.html
[main] INFO org.apache.camel.impl.DefaultCamelContext - Route: route1 started
and consuming from: timer://hello?period=1000
[main] INFO org.apache.camel.impl.DefaultCamelContext - Total 1 routes, of
which 1 are started
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 3.0.1
(CamelContext: camel-1) started in 4.427 seconds
[RabbitMQ Error On Write Thread] WARN
com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection
driver error occured (Exception message: Connection reset)
[AMQP Connection 172.17.0.2:5672] WARN
com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection
driver error occured (Exception message: Connection reset)
[Camel (camel-1) thread #1 - timer://hello] WARN
org.apache.camel.component.rabbitmq.RabbitMQProducer - Got a closed channel
from the pool. Invalidating and borrowing a new one from the pool.
[Camel (camel-1) thread #1 - timer://hello] INFO
org.apache.camel.component.rabbitmq.RabbitMQProducer - Reconnecting to RabbitMQ
[Camel (camel-1) thread #1 - timer://hello] ERROR
org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery
for (MessageId: ID-renish-1583472848118-0-50 on ExchangeId:
ID-renish-1583472848118-0-49). Exhausted after delivery attempt: 1 caught:
java.io.IOException
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor
Elapsed (ms)
[route1 ] [route1 ] [from[timer://hello?period=1000]
] [ 14]
[route1 ] [transform1 ] [transform[Simple: Random number
${random(0,100)}] ] [ 0]
[route1 ] [to1 ]
[rabbitmq:test?addresses=172.17.0.2:5672, 172.17.0.3:5672, 172.17.0.4:5672,
172] [ 0]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:968)
at
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
at
org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindQueue(RabbitMQDeclareSupport.java:132)
at
org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindExchangeWithQueue(RabbitMQDeclareSupport.java:55)
at
org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindExchangesAndQueuesUsing(RabbitMQDeclareSupport.java:36)
at
org.apache.camel.component.rabbitmq.RabbitMQEndpoint.declareExchangeAndQueue(RabbitMQEndpoint.java:231)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer$1.doWithChannel(RabbitMQProducer.java:116)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer$1.doWithChannel(RabbitMQProducer.java:113)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.execute(RabbitMQProducer.java:90)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.attemptDeclaration(RabbitMQProducer.java:113)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.openConnectionAndChannelPool(RabbitMQProducer.java:108)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.checkConnectionAndChannelPool(RabbitMQProducer.java:135)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.execute(RabbitMQProducer.java:85)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.basicPublish(RabbitMQProducer.java:285)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.processInOnly(RabbitMQProducer.java:272)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.process(RabbitMQProducer.java:197)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:134)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:476)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:185)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:87)
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:228)
at
org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:193)
at
org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:75)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol
method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED -
inequivalent arg 'x-queue-type' for queue 'volante' in vhost '/': received none
but current is the value 'quorum' of type 'longstr', class-id=50, method-id=10)
at
com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 26 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol
method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED -
inequivalent arg 'x-queue-type' for queue 'volante' in vhost '/': received none
but current is the value 'quorum' of type 'longstr', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at
com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
at
com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
at java.lang.Thread.run(Thread.java:748)
[Camel (camel-1) thread #1 - timer://hello] WARN
org.apache.camel.component.timer.TimerConsumer - Error processing exchange.
Exchange[ID-renish-1583472848118-0-49]. Caused by: [java.io.IOException - null]
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:968)
at
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
at
org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindQueue(RabbitMQDeclareSupport.java:132)
at
org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindExchangeWithQueue(RabbitMQDeclareSupport.java:55)
at
org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindExchangesAndQueuesUsing(RabbitMQDeclareSupport.java:36)
at
org.apache.camel.component.rabbitmq.RabbitMQEndpoint.declareExchangeAndQueue(RabbitMQEndpoint.java:231)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer$1.doWithChannel(RabbitMQProducer.java:116)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer$1.doWithChannel(RabbitMQProducer.java:113)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.execute(RabbitMQProducer.java:90)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.attemptDeclaration(RabbitMQProducer.java:113)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.openConnectionAndChannelPool(RabbitMQProducer.java:108)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.checkConnectionAndChannelPool(RabbitMQProducer.java:135)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.execute(RabbitMQProducer.java:85)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.basicPublish(RabbitMQProducer.java:285)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.processInOnly(RabbitMQProducer.java:272)
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.process(RabbitMQProducer.java:197)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:134)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:476)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:185)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:87)
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:228)
at
org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:193)
at
org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:75)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol
method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED -
inequivalent arg 'x-queue-type' for queue 'volante' in vhost '/': received none
but current is the value 'quorum' of type 'longstr', class-id=50, method-id=10)
at
com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 26 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol
method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED -
inequivalent arg 'x-queue-type' for queue 'volante' in vhost '/': received none
but current is the value 'quorum' of type 'longstr', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at
com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
at
com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
at java.lang.Thread.run(Thread.java:748)
[Camel (camel-1) thread #1 - timer://hello] WARN
org.apache.camel.component.rabbitmq.RabbitMQProducer - Got a closed channel
from the pool. Invalidating and borrowing a new one from the pool.
[Camel (camel-1) thread #1 - timer://hello] ERROR
org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery
for (MessageId: ID-renish-1583472848118-0-52 on ExchangeId:
ID-renish-1583472848118-0-51). Exhausted after delivery attempt: 1 caught:
java.io.IOException