Hi,

I'm using Camel 2.23.0 and activemq-camel. I'm consuming messages from a 
virtual topic. I want to detect when there are problems in the connection with 
the broker and when (automatic) recovery is successful. For the initial 
detection of a connection error I use an exception listener. This one is 
triggered as expected. However, I don't succeed in detecting when the 
auto-recovery has succeeded (I need to be able to detect the recovery without a 
new message being consumed). I tried to use an instance of 
DefaultMessageListenerContainer and call its isRecovering() method to see if 
recovery has succeeded.

from("imq:queue:Consumer.app.VirtualTopic.Update?messageListenerContainerFactoryRef=#myContainerFactory")
.log("Message received from virtual topic 'VirtualTopic.Update': ${body}")
...

An exception listener "JmsExceptionListener" listens for exceptions and 
triggers a route:

public class JmsExceptionListener implements ExceptionListener {

                @Autowired
                private ProducerTemplate producerTemplate;

@Override
public void onException(JMSException exception) {
                producerTemplate.sendBody ("direct:error", "");
}
}

Error route:

from("direct:error")
                .onException(Exception.class)
                               ... (infinite retries with fixed delay)
                .end()
.filter(method("myMessageListenerContainer", "isRecovering")
                .throwException(new Exception("still recovering"))
.end()
.log("Connection restored")
...

Bean configuration:

<bean id="imq" class="org.apache.activemq.camel.component.ActiveMQComponent"
          p:acknowledgementModeName="CLIENT_ACKNOWLEDGE"
          p:receiveTimeout="30000"
          p:requestTimeout="30000"
          p:concurrentConsumers="1"
          p:cacheLevelName="CACHE_CONSUMER"
          p:testConnectionOnStartup="true"
          p:connectionFactory-ref="imqConnectionFactory"
          p:recoveryInterval="2000"/>

<amq:connectionFactory id="imqConnectionFactory"  
consumerExpiryCheckEnabled="false"  exceptionListener="#amqExceptionListener"  
brokerURL="${imq.broker.url}"/>
<amq:prefetchPolicy all="1"/>

<bean id="amqExceptionListener" class="be.myapp.listener.JmsExceptionListener"/>

<bean id="myMessageListenerContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
                p:connectionFactory-ref="imqConnectionFactory"
                p:destination-name="queue:Consumer.app.VirtualTopic.Update"/>

Classes:

public class MyMessageListenerContainerFactory implements 
MessageListenerContainerFactory {

                @Autowired
                private DefaultMessageListenerContainer 
myMessageListenerContainer;

                @Override
                public AbstractMessageListenerContainer 
createMessageListenerContainer(JmsEndpoint endpoint) {
                               return myMessageListenerContainer;
                }
}

When manually bringing down my locally-running activeMQ broker, the exception 
handler is correctly triggered, but the isRecovering() method of my 
MyMessageListenerContainer returns false, while I had expected it to return 
true. What can be the cause of this? Is there a problem in my configuration? Is 
there any other way to easily detect the recovery state?

Best regards,
Kristof Geldhof

Reply via email to