I am encountering a problem in my unit test that I was hoping i could get
help on. Basically I am building a unit test of a JTA transacted route and
the transactions are not rolling back as expected. I would be grateful of
any assistance in the matter. Please be aware that I am unable to use
spring to configure this for a number of technical as well as political
reasons so that is off the table.
First I set up the transaction manager
final Properties txnSvcProps = new Properties();
txnSvcProps.setProperty("com.atomikos.icatch.service",
"com.atomikos.icatch.standalone.UserTransactionServiceFactory");
txnSvcProps.setProperty("com.atomikos.icatch.output_dir",
"./target/atomikos/");
txnSvcProps.setProperty("com.atomikos.icatch.log_base_dir",
"./target/atomikos/");
txnSvcProps.setProperty("com.atomikos.icatch.console_log_level",
"DEBUG");
txnSvcProps.setProperty("com.atomikos.icatch.tm_unique_name",
this.txnManagerServiceName);
/* The Atomikos user transaction service. */
final UserTransactionServiceImp atomikosUserTxnService = new
UserTransactionServiceImp(txnSvcProps);
atomikosUserTxnService.init();
this.atomikosTxMgr = new UserTransactionManager();
this.atomikosTxMgr.setStartupTransactionService(false); // already
started above.
try {
this.atomikosTxMgr.init();
} catch (final SystemException ex) {
throw new AssertionError("Error initializing JTA transaction
manager", ex);
}
this.userTxn = new UserTransactionImp();
Then ActiveMQ embedded broker
this.broker.setBrokerName(brokerName.trim());
final String connector = "vm://" + brokerName;
this.broker.addConnector(connector + "?create=false");
this.broker.start();
// -- Configure AMQ connection factory and JTA.
if (log.isDebugEnabled()) log.debug("Initializing connection factory
and transaction management.");
this.amqcf = new
ActiveMQXAConnectionFactory(this.broker.getTransportConnectors().get(0).getUri());
Then the Activemq component.
registry().bind(KEY_ACTIVEMQ_CF, brokerSupport.amqcf);
final ActiveMQComponent amq = (ActiveMQComponent)
camelContext.getComponent("activemq");
amq.setConnectionFactory(brokerSupport.amqcf);
amq.setTransacted(false); // must be disabled for JTA
amq.setTransactionManager(txnMgrSupport.fetchJTATxnMgr());
Here is the route under test:
protected void configureBasicRoute() {
final String fromURI =
String.format(endpointAMQ("%s?concurrentConsumers=%d"),
config.basicRouteInboxQueue(),
config.concurrentConsumers());
from(fromURI).routeId(ROUTE_ID_BASIC_ROUTE)
.setHeader(HDR_REQUESTOR, constant(getContext().getName()))
.to(endpointAMQ(config.basicRouteOutboxQueue())).id("end");
}
Then I run the test.
@Test
public void testTransactionFailedFlow() throws Exception {
this.context.getRouteDefinition(ProjectRouteBuilder.ROUTE_ID_BASIC_ROUTE)
.adviceWith(this.context, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
// This allows us to see every exchange that goes through an
endpoint. Normally endpoints do not store exchanges,
// mocks do. Note that this only adds mock interceptors to
PRODUCERS. If you want to change the "from(...)" line,
// use replaceFromWith() method.
weaveById("end").before().process(new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
throw new IllegalArgumentException("Forcing transaction to
fail!");
}
});
mockEndpoints("activemq:queue:*");
}
});
startCamelContext();
final String testBody = "Testing"; // A test body to send through the
queues.
final MockEndpoint mockOutbox =
assertAndGetMockEndpoint(mockEndpointAMQ(projectConfig.basicRouteOutboxQueue()));
mockOutbox.expectedMessageCount(0);
template.sendBody(DIRECT_FEED_INBOX, ExchangePattern.InOnly, testBody);
mockOutbox.assertIsSatisfied();
brokerSupport.assertQueueSize(this.projectConfig.basicRouteInboxQueue(), 1);
brokerSupport.assertQueueSize(this.projectConfig.basicRouteOutboxQueue(),
0);
}
The problem is the test fails, the message doesn't end up back in the
inbox. The assertQueueSize() is a utility I wrote to use the JMX MBean of
ActiveMQ to check the size of the queue and I know that part works
correctly. I have been busting brains on this for hours and I am hoping I
am missing something simple.
Thanks in advance.
*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*