Greetings, I have an interesting use case I was wondering if anyone had
ideas on.

Essentially there is a notifications table in a database that has
essentially the following structure:

CREATE TABLE "etl_case_notification" (
  "process_id" int(11) NOT NULL DEFAULT '0',
  "table_name" varchar(100) NOT NULL,
  "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ,
  "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  PRIMARY KEY ("process_id")
);

The idea is that we check the notifications table periodically (quartz) and
then when there is a record that has a last updated date greater than last
processed date, we read the data in table_name and process the records,
routing them to the appropriate queue and when we have all the records
routed, we update the last_processed date.

The use case has the following goals:
1) If a single record in table_name fails to process, it gets sent to dead
letter queue but no rollback.
2) If there is an error in the update of the last processed time, all
processed records are removed from the queues they are routed to.
3) If any record in table_name fails to get to a queue to be routed, all
records in that event should be aborted and last_processed not updated.

To try to implement this I have the following route (note I have
abbreviated items not relevant to the question.

from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger
    .onException(Exception.class).handled(true).to("direct:dead") //
failures to DLQ
    .to("activemq:queue:event"); // send to event queue

from("activemq:queue:event").routeId("processEvent") // from event queue
    .onException(Exception.class).handled(false).to("direct:dead") //
Failures to DLQ + rollback
    .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route
transacted
    .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the
sql to run.
    .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()) //
find the cases from the store
    .split(body()).to("direct:routing").end() // split each case to routing
    .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) //
cases routed, update last processed.
    .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()); //
execute update

from("direct:routing").routeId("routeRecord") // routing of records
    .onException(Exception.class).handled(true).to("direct:dead") //
failures to DLQ, No rollback
    .convertBodyTo(Record.class) // could potentially except
    .to("activemq;queue:routing") // to routing tasks


The problem is that the rollbacks are not working. A rollback in a single
record process doesn't cause the whole event to fail but a rollback invoked
for failing to update the database or prior to sending the event for
routing doesn't rollback either. Does anyone have any idea of how this
could be solved? Note that I am sure my transaction management is working
because it works for other routes.

Cross opted to stack overflow:
http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*

Reply via email to