I am encountering a problem in my unit test that I was hoping i could get
help on. Basically I am building a unit test of a JTA transacted route and
the transactions are not rolling back as expected. I would be grateful of
any assistance in the matter. Please be aware that I am unable to use
spring to configure this for a number of technical as well as political
reasons so that is off the table.

First I set up the transaction manager

    final Properties txnSvcProps = new Properties();
    txnSvcProps.setProperty("com.atomikos.icatch.service",
"com.atomikos.icatch.standalone.UserTransactionServiceFactory");
    txnSvcProps.setProperty("com.atomikos.icatch.output_dir",
"./target/atomikos/");
    txnSvcProps.setProperty("com.atomikos.icatch.log_base_dir",
"./target/atomikos/");
    txnSvcProps.setProperty("com.atomikos.icatch.console_log_level",
"DEBUG");
    txnSvcProps.setProperty("com.atomikos.icatch.tm_unique_name",
this.txnManagerServiceName);
    /* The Atomikos user transaction service. */
    final UserTransactionServiceImp atomikosUserTxnService = new
UserTransactionServiceImp(txnSvcProps);
    atomikosUserTxnService.init();
    this.atomikosTxMgr = new UserTransactionManager();
    this.atomikosTxMgr.setStartupTransactionService(false); // already
started above.
    try {
      this.atomikosTxMgr.init();
    } catch (final SystemException ex) {
      throw new AssertionError("Error initializing JTA transaction
manager", ex);
    }
    this.userTxn = new UserTransactionImp();

Then ActiveMQ embedded broker

    this.broker.setBrokerName(brokerName.trim());
    final String connector = "vm://" + brokerName;
    this.broker.addConnector(connector + "?create=false");
    this.broker.start();

    // -- Configure AMQ connection factory and JTA.
    if (log.isDebugEnabled()) log.debug("Initializing connection factory
and transaction management.");
    this.amqcf = new
ActiveMQXAConnectionFactory(this.broker.getTransportConnectors().get(0).getUri());

Then the Activemq component.

    registry().bind(KEY_ACTIVEMQ_CF, brokerSupport.amqcf);
    final ActiveMQComponent amq = (ActiveMQComponent)
camelContext.getComponent("activemq");
    amq.setConnectionFactory(brokerSupport.amqcf);
    amq.setTransacted(false); // must be disabled for JTA
    amq.setTransactionManager(txnMgrSupport.fetchJTATxnMgr());


Here is the route under test:

  protected void configureBasicRoute() {
    final String fromURI =
String.format(endpointAMQ("%s?concurrentConsumers=%d"),
config.basicRouteInboxQueue(),
      config.concurrentConsumers());
    from(fromURI).routeId(ROUTE_ID_BASIC_ROUTE)
      .setHeader(HDR_REQUESTOR, constant(getContext().getName()))
      .to(endpointAMQ(config.basicRouteOutboxQueue())).id("end");
  }

Then I run the test.

  @Test
  public void testTransactionFailedFlow() throws Exception {

this.context.getRouteDefinition(ProjectRouteBuilder.ROUTE_ID_BASIC_ROUTE)
      .adviceWith(this.context, new AdviceWithRouteBuilder() {
        @Override
        public void configure() throws Exception {
          // This allows us to see every exchange that goes through an
endpoint. Normally endpoints do not store exchanges,
          // mocks do. Note that this only adds mock interceptors to
PRODUCERS. If you want to change the "from(...)" line,
          // use replaceFromWith() method.
          weaveById("end").before().process(new Processor() {
            @Override
            public void process(final Exchange exchange) throws Exception {
              throw new IllegalArgumentException("Forcing transaction to
fail!");
            }
          });
          mockEndpoints("activemq:queue:*");
        }
      });
    startCamelContext();

    final String testBody = "Testing"; // A test body to send through the
queues.

    final MockEndpoint mockOutbox =
assertAndGetMockEndpoint(mockEndpointAMQ(projectConfig.basicRouteOutboxQueue()));
    mockOutbox.expectedMessageCount(0);

    template.sendBody(DIRECT_FEED_INBOX, ExchangePattern.InOnly, testBody);

    mockOutbox.assertIsSatisfied();


brokerSupport.assertQueueSize(this.projectConfig.basicRouteInboxQueue(), 1);

brokerSupport.assertQueueSize(this.projectConfig.basicRouteOutboxQueue(),
0);
  }

The problem is the test fails, the message doesn't end up back in the
inbox. The assertQueueSize() is a utility I wrote to use the JMX MBean of
ActiveMQ to check the size of the queue and I know that part works
correctly. I have been busting brains on this for hours and I am hoping I
am missing something simple.

Thanks in advance.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*

Reply via email to