Yeah I know that the end() is missing. That was a copy paste error, the
code wasn't the actual route, it had to be trimmed because of proprietary
information and I trimmed a bit too much. Sorry. However I have found that
the real problem is that the AMQ connection factory is not being enrolled
as a participant in the JTA transaction. I have some simple test cases now,
one with just the Database, one with just AMQ and finally a composite test
case and in the test with only AMQ the transaction still doesn't roll back
properly.

I have updated the stack overflow thread with a better sample code snippet
and my transaction manager setup. Would probably be more efficient if I
referred you to that:

http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs

Thanks a bunch.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Fri, Apr 11, 2014 at 4:58 AM, Grzegorz Grzybek <[email protected]>wrote:

> Hello
>
> Your onException clauses are not complete without "end()" and actually you
> have no real exception handlers (tried this on Camel-2.13.0).
>
> regards
> Grzegorz Grzybek
>
>
> 2014-04-10 18:42 GMT+02:00 kraythe . <[email protected]>:
>
> > Greetings, I have an interesting use case I was wondering if anyone had
> > ideas on.
> >
> > Essentially there is a notifications table in a database that has
> > essentially the following structure:
> >
> > CREATE TABLE "etl_case_notification" (
> >   "process_id" int(11) NOT NULL DEFAULT '0',
> >   "table_name" varchar(100) NOT NULL,
> >   "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ,
> >   "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
> >   PRIMARY KEY ("process_id")
> > );
> >
> > The idea is that we check the notifications table periodically (quartz)
> and
> > then when there is a record that has a last updated date greater than
> last
> > processed date, we read the data in table_name and process the records,
> > routing them to the appropriate queue and when we have all the records
> > routed, we update the last_processed date.
> >
> > The use case has the following goals:
> > 1) If a single record in table_name fails to process, it gets sent to
> dead
> > letter queue but no rollback.
> > 2) If there is an error in the update of the last processed time, all
> > processed records are removed from the queues they are routed to.
> > 3) If any record in table_name fails to get to a queue to be routed, all
> > records in that event should be aborted and last_processed not updated.
> >
> > To try to implement this I have the following route (note I have
> > abbreviated items not relevant to the question.
> >
> > from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger
> >     .onException(Exception.class).handled(true).to("direct:dead") //
> > failures to DLQ
> >     .to("activemq:queue:event"); // send to event queue
> >
> > from("activemq:queue:event").routeId("processEvent") // from event queue
> >     .onException(Exception.class).handled(false).to("direct:dead") //
> > Failures to DLQ + rollback
> >     .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route
> > transacted
> >     .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the
> > sql to run.
> >     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName())
> //
> > find the cases from the store
> >     .split(body()).to("direct:routing").end() // split each case to
> routing
> >     .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) //
> > cases routed, update last processed.
> >     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName());
> //
> > execute update
> >
> > from("direct:routing").routeId("routeRecord") // routing of records
> >     .onException(Exception.class).handled(true).to("direct:dead") //
> > failures to DLQ, No rollback
> >     .convertBodyTo(Record.class) // could potentially except
> >     .to("activemq;queue:routing") // to routing tasks
> >
> >
> > The problem is that the rollbacks are not working. A rollback in a single
> > record process doesn't cause the whole event to fail but a rollback
> invoked
> > for failing to update the database or prior to sending the event for
> > routing doesn't rollback either. Does anyone have any idea of how this
> > could be solved? Note that I am sure my transaction management is working
> > because it works for other routes.
> >
> > Cross opted to stack overflow:
> >
> >
> http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs
> >
> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >
>

Reply via email to