So I think there is a problem with the way rollback is implemented in relation to Camel. As far as I can tell there is no way to get the following working.
package com.ea.wwce.camel.test.utilities; import com.ea.wwce.camel.utilities.data.RecordList; import com.ea.wwce.camel.utilities.transactions.TxnHelper; import org.apache.camel.ExchangePattern; import org.apache.camel.builder.AdviceWithRouteBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.testng.annotations.Test; import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*; import static com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ; import static com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings; import static org.apache.camel.ExchangePattern.InOnly; /** This test suite validates the transaction configuration in the test suite. */ public class JMSOnlyTransactionTest extends AMQRouteTestSupport { private static final String QUEUE_DEAD = "dead"; private static final String QUEUE_INBOX = "inbox"; private static final String QUEUE_OUTBOX = "outbox"; private static final String ROUTE_ID_FEED = "Feed"; private static final String ROUTE_ID_TEST_ROUTE = "TestRoute"; private static final String ROUTE_ID_RESULTS = "ResultsRoute"; private static final String ROUTE_ID_DEAD = "DeadRoute"; private static final String DIRECT_FEED_INBOX = "direct:feed_inbox"; private static final String MOCK_END = "mock:end"; private static final String MOCK_BEFORE_TO_QUEUE = "mock:before_to_queue"; private static final String MOCK_AFTER_TO_QUEUE = "mock:after_to_queue"; /** Mock endpoints. */ private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue, mockAfterToQueue; /** Helper to initialize mocks in the test. */ private void initMocks() { mockEnd = assertAndGetMockEndpoint(MOCK_END); mockDead = assertAndGetMockEndpoint(MOCK_DEAD); mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE); mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE); mockOutbox = assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX)); } @Override protected RouteBuilder createRouteBuilder() { System.out.println("createRouteBuilder"); return new RouteBuilder(this.context) { @Override public void configure() { getContext().setTracing(true); from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED) .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX)); from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS) .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) .unmarshal(dfCaseRecord).to(MOCK_END); from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD) .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) .unmarshal(dfCaseRecord).to(MOCK_DEAD); from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE) .onException(RuntimeException.class).handled(true).useOriginalMessage() .to(InOnly, endpointAMQ(QUEUE_DEAD)).end() .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW) .unmarshal(dfCaseRecord) .to(MOCK_BEFORE_TO_QUEUE) .marshal(dfCaseRecord) .to(endpointAMQ(QUEUE_OUTBOX)) .unmarshal(dfCaseRecord) .to(MOCK_AFTER_TO_QUEUE); } }; } /** Advice the route, mocking ActiveMQ endpoints. */ protected void adviceRoute() throws Exception { this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith( this.context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { mockEndpoints("activemq:queue:*"); } } ); } @Test public void testNormalRouting() throws Exception { adviceRoute(); startCamelContext(); initMocks(); final RecordList cases = casesA(); mockEnd.expectedBodiesReceived(cases); mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, cases)); mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); template.sendBody(DIRECT_FEED_INBOX, cases); mockBeforeToQueue.assertIsSatisfied(); mockAfterToQueue.assertIsSatisfied(); mockEnd.assertIsSatisfied(); mockDead.assertIsSatisfied(); mockOutbox.assertIsSatisfied(); } @Test public void testRollbackBeforeEnqueue() throws Exception { adviceRoute(); startCamelContext(); initMocks(); final RecordList cases = casesA(); mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2)); mockDead.expectedBodiesReceived(cases.get(0)); mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, cases.get(1), cases.get(2))); mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2)); template.sendBody(DIRECT_FEED_INBOX, cases); mockBeforeToQueue.assertIsSatisfied(); mockAfterToQueue.assertIsSatisfied(); mockEnd.assertIsSatisfied(); mockDead.assertIsSatisfied(); mockOutbox.assertIsSatisfied(); } @Test public void testRollbackAfterEnqueue() throws Exception { adviceRoute(); startCamelContext(); initMocks(); final RecordList cases = casesA(); mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2)); mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0)); mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, cases)); mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); template.sendBody(DIRECT_FEED_INBOX, cases); mockBeforeToQueue.assertIsSatisfied(); mockAfterToQueue.assertIsSatisfied(); mockDead.assertIsSatisfied(); mockOutbox.assertIsSatisfied(); mockEnd.assertIsSatisfied(); } } I have tried dozens of combinations in the onException clause and nothing works. Adding markRollbackOnly(), or rollback() only succeeds in rolling back the dead letter channel as well. Making the handled(false) cause AMQ to resubmit the transaction and rolls back the dead letter channel. I have tried a dozen combinations so if anyone has one that works I would be grateful. *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* *Author of: Hardcore Java (2003) and Maintainable Java (2012)* *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kray...@gmail.com> wrote: > So, in the ongoing perfect transaction configuration we have an > interesting use case: Consider the following route in a test: > > @Override > protected RouteBuilder createRouteBuilder() { > System.out.println("createRouteBuilder"); > return new RouteBuilder(this.context) { > @Override > public void configure() { > getContext().setTracing(true); > from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED) > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) > > .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX)); > from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS) > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) > .unmarshal(dfCaseRecord).to(MOCK_END); > from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD) > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) > .unmarshal(dfCaseRecord).to(MOCK_DEAD); > from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE) > > .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end() > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) > .unmarshal(dfCaseRecord) > .to(MOCK_BEFORE_TO_QUEUE) > .marshal(dfCaseRecord) > .to(endpointAMQ(QUEUE_OUTBOX)) > .unmarshal(dfCaseRecord) > .to(MOCK_AFTER_TO_QUEUE); > } > }; > } > > Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks > up the transaction policy by name as it is just a string key for our JNDI > registry. Our test case looks like the following. > > @Test > public void testRollbackAfterEnqueue() throws Exception { > adviceRoute(); > startCamelContext(); > initMocks(); > final RecordList cases = casesA(); > > mockEnd.expectedMessageCount(2); > mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2)); > mockDead.expectedMessageCount(1); > mockDead.expectedBodiesReceived(cases.get(0)); > mockBeforeToQueue.expectedMessageCount(3); > mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); > mockOutbox.expectedMessageCount(3); > > mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, > cases)); > mockAfterToQueue.expectedMessageCount(3); > mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); > mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); > > template.sendBody(DIRECT_FEED_INBOX, cases); > > mockBeforeToQueue.assertIsSatisfied(); > mockAfterToQueue.assertIsSatisfied(); > mockEnd.assertIsSatisfied(); > mockDead.assertIsSatisfied(); > mockOutbox.assertIsSatisfied(); > } > > In this route the goal is that if any exceptions are thrown even after the > message is enqueued, it should be rolled back, the message that excepted > should go to the DLQ and the messages in the outbox should be rolled back > but the message from the inbox should not be put back on the queue. This is > proving to be a bit of a juggling act. > > I tried putting markRollBackOnly() in the exception handler after the > to(dead) but that rolled back the dead letter queue and outbox and then > redelivered the inbox message. Removing the markRollbackOnly() means that > the message arrives at dead and is off the inbox but it doesn't get removed > from the outbox. > > So I am looking for the right combination of calls to accomplish what I > want to do. Any suggestions? > > Thanks in advance. > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* >