Hello

Your onException clauses are not complete without "end()" and actually you
have no real exception handlers (tried this on Camel-2.13.0).

regards
Grzegorz Grzybek


2014-04-10 18:42 GMT+02:00 kraythe . <[email protected]>:

> Greetings, I have an interesting use case I was wondering if anyone had
> ideas on.
>
> Essentially there is a notifications table in a database that has
> essentially the following structure:
>
> CREATE TABLE "etl_case_notification" (
>   "process_id" int(11) NOT NULL DEFAULT '0',
>   "table_name" varchar(100) NOT NULL,
>   "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ,
>   "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
>   PRIMARY KEY ("process_id")
> );
>
> The idea is that we check the notifications table periodically (quartz) and
> then when there is a record that has a last updated date greater than last
> processed date, we read the data in table_name and process the records,
> routing them to the appropriate queue and when we have all the records
> routed, we update the last_processed date.
>
> The use case has the following goals:
> 1) If a single record in table_name fails to process, it gets sent to dead
> letter queue but no rollback.
> 2) If there is an error in the update of the last processed time, all
> processed records are removed from the queues they are routed to.
> 3) If any record in table_name fails to get to a queue to be routed, all
> records in that event should be aborted and last_processed not updated.
>
> To try to implement this I have the following route (note I have
> abbreviated items not relevant to the question.
>
> from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger
>     .onException(Exception.class).handled(true).to("direct:dead") //
> failures to DLQ
>     .to("activemq:queue:event"); // send to event queue
>
> from("activemq:queue:event").routeId("processEvent") // from event queue
>     .onException(Exception.class).handled(false).to("direct:dead") //
> Failures to DLQ + rollback
>     .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route
> transacted
>     .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the
> sql to run.
>     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()) //
> find the cases from the store
>     .split(body()).to("direct:routing").end() // split each case to routing
>     .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) //
> cases routed, update last processed.
>     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()); //
> execute update
>
> from("direct:routing").routeId("routeRecord") // routing of records
>     .onException(Exception.class).handled(true).to("direct:dead") //
> failures to DLQ, No rollback
>     .convertBodyTo(Record.class) // could potentially except
>     .to("activemq;queue:routing") // to routing tasks
>
>
> The problem is that the rollbacks are not working. A rollback in a single
> record process doesn't cause the whole event to fail but a rollback invoked
> for failing to update the database or prior to sending the event for
> routing doesn't rollback either. Does anyone have any idea of how this
> could be solved? Note that I am sure my transaction management is working
> because it works for other routes.
>
> Cross opted to stack overflow:
>
> http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs
>
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>

Reply via email to