Hi,
we have implemented a IdempotentRepository that uses MongoDB for persisting
the idempotent keys. It worked fine when we used a String as the idempotent
key. But now we need to use a compound key (CompoundId) that consists of
three fields of type String.
The IdempotentRepository definition is as follows. Any ideas what's going
wrong here?
public class MongoIdempotentRepository implements
IdempotentRepository<CompoundId> {
@Override
public boolean add(CompoundId key) { ... }
@Override
public boolean confirm(CompoundId key) { ... }
@Override
public boolean contains(CompoundId key) { ... }
@Override
public boolean remove(CompoundId key) { ... }
}
Here's the route where the CompoundId is passed to MongoIdempotentRepository
using header
from(inputQueue).
idempotentConsumer(header(COMPOUND_ID)).messageIdRepository(idempotentRepository).skipDuplicate(false).
filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)).
to(duplicateMessageRoute).
stop().
end().
The following exception occurs when using this approach
2013-10-03 08:31:42,820 | WARN | rom-partner-req] | UnitOfWorkProcessor
| 100 - org.apache.camel.camel-core - 2.10.6 | Caught unhandled exception
while processing ExchangeId:
ID-vagrant-lubuntu-quantic-49191-1380778068989-2-1
org.apache.camel.processor.idempotent.NoMessageIdException: No message ID
could be found using expression: header{header(CompoundId)} on message
exchange: Exchange[JmsMessage[JmsMessageID:
ID:vagrant-lubuntu-quantic-44502-1380778069084-7:2:1:1:1]]
at
org.apache.camel.processor.idempotent.IdempotentConsumer.process(IdempotentConsumer.java:73)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:335)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:220)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:308)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)[100:org.apache.camel.camel-core:2.10.6]
at
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104)[106:org.apache.camel.camel-jms:2.10.6]
at
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)[104:org.springframework.jms:3.0.7.RELEASE]
at
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)[104:org.springframework.jms:3.0.7.RELEASE]
at
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)[104:org.springframework.jms:3.0.7.RELEASE]
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)[104:org.springframework.jms:3.0.7.RELEASE]
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:243)[104:org.springframework.jms:3.0.7.RELEASE]
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)[104:org.springframework.jms:3.0.7.RELEASE]
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)[104:org.springframework.jms:3.0.7.RELEASE]
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)[104:org.springframework.jms:3.0.7.RELEASE]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)[:1.7.0_11]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)[:1.7.0_11]
at java.lang.Thread.run(Thread.java:722)[:1.7.0_11]
--
View this message in context:
http://camel.465427.n5.nabble.com/IdempotentConsumer-custom-key-tp5740789.html
Sent from the Camel - Users mailing list archive at Nabble.com.