Hi,

we have implemented a IdempotentRepository that uses MongoDB for persisting
the idempotent keys. It worked fine when we used a String as the idempotent
key. But now we need to use a compound key (CompoundId) that consists of
three fields of type String.

The IdempotentRepository definition is as follows. Any ideas what's going
wrong here?

public class MongoIdempotentRepository implements
IdempotentRepository<CompoundId> {
    @Override
    public boolean add(CompoundId key) { ... }

    @Override
    public boolean confirm(CompoundId key) { ... }

    @Override
    public boolean contains(CompoundId key) { ... }

    @Override
    public boolean remove(CompoundId key) { ... }
}

Here's the route where the CompoundId is passed to MongoIdempotentRepository
using header

from(inputQueue).                               
idempotentConsumer(header(COMPOUND_ID)).messageIdRepository(idempotentRepository).skipDuplicate(false).
               
filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)).
                to(duplicateMessageRoute).
                stop().
                end().

The following exception occurs when using this approach

2013-10-03 08:31:42,820 | WARN  | rom-partner-req] | UnitOfWorkProcessor        
     
| 100 - org.apache.camel.camel-core - 2.10.6 | Caught unhandled exception
while processing ExchangeId:
ID-vagrant-lubuntu-quantic-49191-1380778068989-2-1
org.apache.camel.processor.idempotent.NoMessageIdException: No message ID
could be found using expression: header{header(CompoundId)} on message
exchange: Exchange[JmsMessage[JmsMessageID:
ID:vagrant-lubuntu-quantic-44502-1380778069084-7:2:1:1:1]]
        at
org.apache.camel.processor.idempotent.IdempotentConsumer.process(IdempotentConsumer.java:73)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:335)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:220)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:308)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)[100:org.apache.camel.camel-core:2.10.6]
        at
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104)[106:org.apache.camel.camel-jms:2.10.6]
        at
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)[104:org.springframework.jms:3.0.7.RELEASE]
        at
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)[104:org.springframework.jms:3.0.7.RELEASE]
        at
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)[104:org.springframework.jms:3.0.7.RELEASE]
        at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)[104:org.springframework.jms:3.0.7.RELEASE]
        at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:243)[104:org.springframework.jms:3.0.7.RELEASE]
        at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)[104:org.springframework.jms:3.0.7.RELEASE]
        at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)[104:org.springframework.jms:3.0.7.RELEASE]
        at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)[104:org.springframework.jms:3.0.7.RELEASE]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)[:1.7.0_11]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)[:1.7.0_11]
        at java.lang.Thread.run(Thread.java:722)[:1.7.0_11]




--
View this message in context: 
http://camel.465427.n5.nabble.com/IdempotentConsumer-custom-key-tp5740789.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to