Yeah I know that the end() is missing. That was a copy paste error, the code wasn't the actual route, it had to be trimmed because of proprietary information and I trimmed a bit too much. Sorry. However I have found that the real problem is that the AMQ connection factory is not being enrolled as a participant in the JTA transaction. I have some simple test cases now, one with just the Database, one with just AMQ and finally a composite test case and in the test with only AMQ the transaction still doesn't roll back properly.
I have updated the stack overflow thread with a better sample code snippet and my transaction manager setup. Would probably be more efficient if I referred you to that: http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs Thanks a bunch. *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* *Author of: Hardcore Java (2003) and Maintainable Java (2012)* *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* On Fri, Apr 11, 2014 at 4:58 AM, Grzegorz Grzybek <[email protected]>wrote: > Hello > > Your onException clauses are not complete without "end()" and actually you > have no real exception handlers (tried this on Camel-2.13.0). > > regards > Grzegorz Grzybek > > > 2014-04-10 18:42 GMT+02:00 kraythe . <[email protected]>: > > > Greetings, I have an interesting use case I was wondering if anyone had > > ideas on. > > > > Essentially there is a notifications table in a database that has > > essentially the following structure: > > > > CREATE TABLE "etl_case_notification" ( > > "process_id" int(11) NOT NULL DEFAULT '0', > > "table_name" varchar(100) NOT NULL, > > "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP , > > "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', > > PRIMARY KEY ("process_id") > > ); > > > > The idea is that we check the notifications table periodically (quartz) > and > > then when there is a record that has a last updated date greater than > last > > processed date, we read the data in table_name and process the records, > > routing them to the appropriate queue and when we have all the records > > routed, we update the last_processed date. > > > > The use case has the following goals: > > 1) If a single record in table_name fails to process, it gets sent to > dead > > letter queue but no rollback. > > 2) If there is an error in the update of the last processed time, all > > processed records are removed from the queues they are routed to. > > 3) If any record in table_name fails to get to a queue to be routed, all > > records in that event should be aborted and last_processed not updated. > > > > To try to implement this I have the following route (note I have > > abbreviated items not relevant to the question. > > > > from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger > > .onException(Exception.class).handled(true).to("direct:dead") // > > failures to DLQ > > .to("activemq:queue:event"); // send to event queue > > > > from("activemq:queue:event").routeId("processEvent") // from event queue > > .onException(Exception.class).handled(false).to("direct:dead") // > > Failures to DLQ + rollback > > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route > > transacted > > .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the > > sql to run. > > .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()) > // > > find the cases from the store > > .split(body()).to("direct:routing").end() // split each case to > routing > > .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) // > > cases routed, update last processed. > > .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()); > // > > execute update > > > > from("direct:routing").routeId("routeRecord") // routing of records > > .onException(Exception.class).handled(true).to("direct:dead") // > > failures to DLQ, No rollback > > .convertBodyTo(Record.class) // could potentially except > > .to("activemq;queue:routing") // to routing tasks > > > > > > The problem is that the rollbacks are not working. A rollback in a single > > record process doesn't cause the whole event to fail but a rollback > invoked > > for failing to update the database or prior to sending the event for > > routing doesn't rollback either. Does anyone have any idea of how this > > could be solved? Note that I am sure my transaction management is working > > because it works for other routes. > > > > Cross opted to stack overflow: > > > > > http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs > > > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* > > >
