Lovely. I found the problem. Posted the solution on stack overflow for the consumption of others.
http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs/23066951#23066951 *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* *Author of: Hardcore Java (2003) and Maintainable Java (2012)* *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* On Mon, Apr 14, 2014 at 9:10 AM, kraythe . <[email protected]> wrote: > Yeah I know that the end() is missing. That was a copy paste error, the > code wasn't the actual route, it had to be trimmed because of proprietary > information and I trimmed a bit too much. Sorry. However I have found that > the real problem is that the AMQ connection factory is not being enrolled > as a participant in the JTA transaction. I have some simple test cases now, > one with just the Database, one with just AMQ and finally a composite test > case and in the test with only AMQ the transaction still doesn't roll back > properly. > > I have updated the stack overflow thread with a better sample code snippet > and my transaction manager setup. Would probably be more efficient if I > referred you to that: > > > http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs > > Thanks a bunch. > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* > > > On Fri, Apr 11, 2014 at 4:58 AM, Grzegorz Grzybek <[email protected]>wrote: > >> Hello >> >> Your onException clauses are not complete without "end()" and actually you >> have no real exception handlers (tried this on Camel-2.13.0). >> >> regards >> Grzegorz Grzybek >> >> >> 2014-04-10 18:42 GMT+02:00 kraythe . <[email protected]>: >> >> > Greetings, I have an interesting use case I was wondering if anyone had >> > ideas on. >> > >> > Essentially there is a notifications table in a database that has >> > essentially the following structure: >> > >> > CREATE TABLE "etl_case_notification" ( >> > "process_id" int(11) NOT NULL DEFAULT '0', >> > "table_name" varchar(100) NOT NULL, >> > "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP , >> > "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', >> > PRIMARY KEY ("process_id") >> > ); >> > >> > The idea is that we check the notifications table periodically (quartz) >> and >> > then when there is a record that has a last updated date greater than >> last >> > processed date, we read the data in table_name and process the records, >> > routing them to the appropriate queue and when we have all the records >> > routed, we update the last_processed date. >> > >> > The use case has the following goals: >> > 1) If a single record in table_name fails to process, it gets sent to >> dead >> > letter queue but no rollback. >> > 2) If there is an error in the update of the last processed time, all >> > processed records are removed from the queues they are routed to. >> > 3) If any record in table_name fails to get to a queue to be routed, all >> > records in that event should be aborted and last_processed not updated. >> > >> > To try to implement this I have the following route (note I have >> > abbreviated items not relevant to the question. >> > >> > from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger >> > .onException(Exception.class).handled(true).to("direct:dead") // >> > failures to DLQ >> > .to("activemq:queue:event"); // send to event queue >> > >> > from("activemq:queue:event").routeId("processEvent") // from event queue >> > .onException(Exception.class).handled(false).to("direct:dead") // >> > Failures to DLQ + rollback >> > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route >> > transacted >> > .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the >> > sql to run. >> > .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()) >> // >> > find the cases from the store >> > .split(body()).to("direct:routing").end() // split each case to >> routing >> > .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) >> // >> > cases routed, update last processed. >> > .to("sql:?dataSource=" + >> this.config.analyticsDatasourceJNDIName()); // >> > execute update >> > >> > from("direct:routing").routeId("routeRecord") // routing of records >> > .onException(Exception.class).handled(true).to("direct:dead") // >> > failures to DLQ, No rollback >> > .convertBodyTo(Record.class) // could potentially except >> > .to("activemq;queue:routing") // to routing tasks >> > >> > >> > The problem is that the rollbacks are not working. A rollback in a >> single >> > record process doesn't cause the whole event to fail but a rollback >> invoked >> > for failing to update the database or prior to sending the event for >> > routing doesn't rollback either. Does anyone have any idea of how this >> > could be solved? Note that I am sure my transaction management is >> working >> > because it works for other routes. >> > >> > Cross opted to stack overflow: >> > >> > >> http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs >> > >> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* >> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* >> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 >> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* >> > >> > >
