Hi,
I'm using Camel 2.23.0 and activemq-camel. I'm consuming messages from a
virtual topic. I want to detect when there are problems in the connection with
the broker and when (automatic) recovery is successful. For the initial
detection of a connection error I use an exception listener. This one is
triggered as expected. However, I don't succeed in detecting when the
auto-recovery has succeeded (I need to be able to detect the recovery without a
new message being consumed). I tried to use an instance of
DefaultMessageListenerContainer and call its isRecovering() method to see if
recovery has succeeded.
from("imq:queue:Consumer.app.VirtualTopic.Update?messageListenerContainerFactoryRef=#myContainerFactory")
.log("Message received from virtual topic 'VirtualTopic.Update': ${body}")
...
An exception listener "JmsExceptionListener" listens for exceptions and
triggers a route:
public class JmsExceptionListener implements ExceptionListener {
@Autowired
private ProducerTemplate producerTemplate;
@Override
public void onException(JMSException exception) {
producerTemplate.sendBody ("direct:error", "");
}
}
Error route:
from("direct:error")
.onException(Exception.class)
... (infinite retries with fixed delay)
.end()
.filter(method("myMessageListenerContainer", "isRecovering")
.throwException(new Exception("still recovering"))
.end()
.log("Connection restored")
...
Bean configuration:
<bean id="imq" class="org.apache.activemq.camel.component.ActiveMQComponent"
p:acknowledgementModeName="CLIENT_ACKNOWLEDGE"
p:receiveTimeout="30000"
p:requestTimeout="30000"
p:concurrentConsumers="1"
p:cacheLevelName="CACHE_CONSUMER"
p:testConnectionOnStartup="true"
p:connectionFactory-ref="imqConnectionFactory"
p:recoveryInterval="2000"/>
<amq:connectionFactory id="imqConnectionFactory"
consumerExpiryCheckEnabled="false" exceptionListener="#amqExceptionListener"
brokerURL="${imq.broker.url}"/>
<amq:prefetchPolicy all="1"/>
<bean id="amqExceptionListener" class="be.myapp.listener.JmsExceptionListener"/>
<bean id="myMessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="imqConnectionFactory"
p:destination-name="queue:Consumer.app.VirtualTopic.Update"/>
Classes:
public class MyMessageListenerContainerFactory implements
MessageListenerContainerFactory {
@Autowired
private DefaultMessageListenerContainer
myMessageListenerContainer;
@Override
public AbstractMessageListenerContainer
createMessageListenerContainer(JmsEndpoint endpoint) {
return myMessageListenerContainer;
}
}
When manually bringing down my locally-running activeMQ broker, the exception
handler is correctly triggered, but the isRecovering() method of my
MyMessageListenerContainer returns false, while I had expected it to return
true. What can be the cause of this? Is there a problem in my configuration? Is
there any other way to easily detect the recovery state?
Best regards,
Kristof Geldhof