Is there a reason you’re not using a persistent aggregation repository? If you could share a simplified version of your route, that would help as well.
> On Apr 11, 2016, at 4:21 AM, glenn <[email protected]> wrote: > > Hi > > I’ve been looking at how to use camel-jms to consume xml from a persistent > queue, do some minor transformation/validation of the xml and either output > a single file or route a message to an error queue. > > The ultimate goal is to ensure that ALL messages are consumed and either a > file is created ONCE or an error is routed ONCE to an error queue. > > As not wishing to use XA transactions by default, I opted to try out an > approach using local JMS transactions, the Camel file component producing > the files and use of an idempotent repository to guard against the > re-processing of redelivered, duplicate messages. > > I’d appreciate views on whether it is the case that the approach of using > the Camel file producer alongside native JMS transactions and an > idempotentRepository cannot achieve the desired goal and that to achieve the > goal must I opt to go down the XA route? > > My preference was a) not to have to add the additional complexity/overhead > of global transactions unless absolutely necessary and b) to use the Camel > file component as the file producer. > > Whilst I can see the original, native JMS transaction based approach working > in part, when considering just the sunny day scenario of successful file > production only the approach does not guarantee that files will only be > produced ONCE. A JVM crash at an inopportune moment between the > JmsTransactionManager transaction commit and file repository record > insertion can lead to skipping file production (the risk of breaking this > guarantee is niche but it does, nonetheless, seem to exist). > > My current thinking is the way to achieve the goal is to use XA > transactions alongside an XA compliant file producer and JTA transaction > manager. However, that does seem to preclude the use of the Camel file > component (being non fully transactional – “best efforts” based). > > For record, I’m using the Camel File component as the file producer with > local JMS transactions, an externally defined Spring transaction manager, > use of the Camel <transacted/> element within route and use of the > FileIdempotentRepository. The FileIdempotentRepository filters out message > duplicates based on the JmsMessageId (it is configured with eager=true, > using false just shifts problem). > The log output below, highlights the point where a JVM crash will lead to > failure to produce a file but still consume the message. > > <code> > 05 Apr 2016 09:06:27 JmsTransactionManager DEBUG Created JMS > transaction on Session [Cached JMS Session: ActiveMQSession > {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=false} > java.lang.Object@44eb7aa] from Connection [Shared JMS Connection: > ActiveMQConnection > {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1,clientId=ID:ANAME-ABCDEFGH-63012-1459843265976-0:1,started=false}] > 05 Apr 2016 09:06:27 TransactionContext DEBUG > Begin:TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4 > 05 Apr 2016 09:06:27 ultJmsMessageListenerContainer DEBUG Received message > of type [class org.apache.activemq.command.ActiveMQBytesMessage] from > consumer [ActiveMQMessageConsumer { > value=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1:244, started=true }] of > transactional session [Cached JMS Session: ActiveMQSession > {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true} > java.lang.Object@44eb7aa] > 05 Apr 2016 09:06:27 EndpointMessageListener DEBUG > Endpoint[inputQueue://ANAME.INV.RESP.Q] consumer received JMS message: > ActiveMQBytesMessage ... > 05 Apr 2016 09:06:27 TransactionErrorHandler DEBUG Transaction > begin (0x2e900519) redelivered(false) for (MessageId: > ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId: > ID-ANAME-ABCDEFGH-62996-1459843263591-0-22)) > 05 Apr 2016 09:06:27 JmsTransactionManager DEBUG Participating in > existing transaction > 05 Apr 2016 09:06:27 TransactionErrorHandler TRACE isRunAllowed() > -> true (Run allowed if we are not stopped/stopping) > ... > 05 Apr 2016 09:06:27 routeUnderTest INFO > ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30: Queue message: <?xml > version="1.0" encoding="UTF-8" standalone="yes"?> > 05 Apr 2016 09:06:27 SendProcessor DEBUG >>>> > Endpoint[AcknowledgementProcessor] > Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID: > ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]] > 05 Apr 2016 09:06:27 AcknowledgementProcessor DEBUG Payload is ... > 05 Apr 2016 09:06:27 AcknowledgementProcessor DEBUG Body is ... > 05 Apr 2016 09:06:27 routeUnderTest INFO > queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30: > Processed message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> > 05 Apr 2016 09:06:27 DefaultStreamCachingStrategy DEBUG Should spool > cache 1023 -> false > ... > 05 Apr 2016 09:06:27 routeUnderTest INFO > queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30: > Validated message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> > 05 Apr 2016 09:06:27 FileIdempotentRepository DEBUG Appending > ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 to idempotent filestore: > C:\Files\filestore\filesOut\.filestore.dat > > *** If JVM CRASH occurs here BEFORE JmsTransactionManager commit then > FileIdempotentRepository entry will remain BUT no file has been created and > no FileIdempotentRepository after processing operations will occur. Upon > restoration, broker will redeliver and message will be consumed but > FileIdempotentRepository will filter out duplicate message and hence NO file > will be produced. *** > > 05 Apr 2016 09:06:27 FilterProcessor DEBUG Filter matches: > false for exchange: > Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID: > ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]] > 05 Apr 2016 09:06:27 SendProcessor DEBUG >>>> > Endpoint[file://name/someResponses] > Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID: > ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]] > 05 Apr 2016 09:06:27 FileOperations DEBUG Using > InputStream to write file: name\someResponses\BuyDeal.xml > 05 Apr 2016 09:06:27 GenericFileProducer DEBUG Wrote > [name\someResponses\BuyDeal.xml] to [Endpoint[file://name/someResponses]] > 05 Apr 2016 09:06:27 routeUnderTest INFO > queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:BuyDeal.xml: > Completed Processing > 05 Apr 2016 09:06:27 TransactionErrorHandler TRACE Is exchangeId: > ID-ANAME-ABCDEFGH-62996-1459843263591-0-22 interrupted? false > ... > 05 Apr 2016 09:06:27 TransactionErrorHandler DEBUG Transaction > commit (0x2e900519) redelivered(false) for (MessageId: > ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId: > ID-ANAME-ABCDEFGH-62996-1459843263591-0-22)) > 05 Apr 2016 09:06:27 JmsTransactionManager DEBUG Initiating > transaction commit > 05 Apr 2016 09:06:27 JmsTransactionManager DEBUG Committing JMS > transaction on Session [Cached JMS Session: ActiveMQSession > {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true} > java.lang.Object@44eb7aa] > 05 Apr 2016 09:06:27 ActiveMQSession DEBUG > ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1 Transaction Commit > :TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4 > 05 Apr 2016 09:06:27 TransactionContext DEBUG Commit: > TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4 syncCount: 2 > > </code> > > Would appreciate views. > > Thanks > > Glen > > > > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Guaranteed-file-processing-JMS-and-file-producer-tp5780899.html > Sent from the Camel - Users mailing list archive at Nabble.com.
