Hello Your onException clauses are not complete without "end()" and actually you have no real exception handlers (tried this on Camel-2.13.0).
regards Grzegorz Grzybek 2014-04-10 18:42 GMT+02:00 kraythe . <[email protected]>: > Greetings, I have an interesting use case I was wondering if anyone had > ideas on. > > Essentially there is a notifications table in a database that has > essentially the following structure: > > CREATE TABLE "etl_case_notification" ( > "process_id" int(11) NOT NULL DEFAULT '0', > "table_name" varchar(100) NOT NULL, > "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP , > "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', > PRIMARY KEY ("process_id") > ); > > The idea is that we check the notifications table periodically (quartz) and > then when there is a record that has a last updated date greater than last > processed date, we read the data in table_name and process the records, > routing them to the appropriate queue and when we have all the records > routed, we update the last_processed date. > > The use case has the following goals: > 1) If a single record in table_name fails to process, it gets sent to dead > letter queue but no rollback. > 2) If there is an error in the update of the last processed time, all > processed records are removed from the queues they are routed to. > 3) If any record in table_name fails to get to a queue to be routed, all > records in that event should be aborted and last_processed not updated. > > To try to implement this I have the following route (note I have > abbreviated items not relevant to the question. > > from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger > .onException(Exception.class).handled(true).to("direct:dead") // > failures to DLQ > .to("activemq:queue:event"); // send to event queue > > from("activemq:queue:event").routeId("processEvent") // from event queue > .onException(Exception.class).handled(false).to("direct:dead") // > Failures to DLQ + rollback > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route > transacted > .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the > sql to run. > .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()) // > find the cases from the store > .split(body()).to("direct:routing").end() // split each case to routing > .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) // > cases routed, update last processed. > .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()); // > execute update > > from("direct:routing").routeId("routeRecord") // routing of records > .onException(Exception.class).handled(true).to("direct:dead") // > failures to DLQ, No rollback > .convertBodyTo(Record.class) // could potentially except > .to("activemq;queue:routing") // to routing tasks > > > The problem is that the rollbacks are not working. A rollback in a single > record process doesn't cause the whole event to fail but a rollback invoked > for failing to update the database or prior to sending the event for > routing doesn't rollback either. Does anyone have any idea of how this > could be solved? Note that I am sure my transaction management is working > because it works for other routes. > > Cross opted to stack overflow: > > http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* >
