Hi,
The following camel route has been configured to be transactional and
a idempotentConsumer defined to avoid to insert several times the same
record into a DB.
// We simply read a file, set the header messageId with the
value returned by invoking the method increaseCounter (0, 1, ....)
from("file://datainsert")
.setHeader("messageId").simple("${bean:util?method=increaseCounter}}")
.log("File created")
.to(corepointJms);
// We read the file, declare the route as transactional, put
the value from the messageId header into the IdempotentRepository
// call the direct endpoint to insert the record into the DB
and generates next an error to rollback into the JMS DLQ queue the
message
from(corepointJms)
.transacted("PROPAGATION_REQUIRES_NEW")
.log(">>>> Counter : ${header.messageId}")
.idempotentConsumer(header("messageId"),
MemoryIdempotentRepository.memoryIdempotentRepository(200))
.setBody().constant(report)
.to("direct:A")
.process(new org.apache.camel.Processor() {
public void process(Exchange exch) throws Exception {
throw new Exception("Cannot connect ....");
}
})
.log("Rollback should happen ...");
from("direct:A")
.transacted("PROPAGATION_REQUIRED")
.to(endpointDatasource)
.setHeader("Processed").constant("true")
.log("Record should be inserted one time .....");
REMARK : The redelivery policy for ActiveMQ is equal to 1. That means
that our exchange will be send twice.
Surprisingly, the record is inserted 2 times while we are using an
idempotentConsumer. Normally, the idempotentConsumer should filter the
message and not send the second times it receive the exchange
containing the messageID key = "0".
Is there something wrong in my config ?
Regards,
Charles Moulliard
Sr. Principal Solution Architect - FuseSource
Apache Committer
Blog : http://cmoulliard.blogspot.com
Twitter : http://twitter.com/cmoulliard
Linkedin : http://www.linkedin.com/in/charlesmoulliard
Skype: cmoulliard