So I have discovered some issues with my code.  The following code seems to
be a working example with Atomikos (resource-context.xml) with Test Case
(ServiceMixXaRollbackTest) using a ServiceMix 4.3 ActiveMq instance
(activemq-broker.xml).


Unfortunately, on transaction rollback (and retry exhaustion) I am expecting
my ActiveMq message to move to the Dead Letter Queue
"DLQ.my_test_thirdparty" as specified in the activemq-broker.xml, since I
don't override any message redelivery and dlq handling behaviour in Camel. 
Instead I am finding that the ActiveMq message lands in the queue
"ActiveMQ.DLQ".

What am I missing in my understanding on the behaviour of ActiveMq and Camel
?


Updated code below.


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Start code snippet: 
"src/main/resources/META-INF/spring/resource-context.xml"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
    xmlns:osgi="http://www.springframework.org/schema/osgi";
    xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0";
    xmlns:broker="http://activemq.apache.org/schema/core";
    xmlns:tx="http://www.springframework.org/schema/tx";
    xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                            http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
                               http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
                               http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
                               http://servicemix.apache.org/file/1.0
http://servicemix.apache.org/file/1.0/servicemix-file.xsd
                               http://servicemix.apache.org/cxfse/1.0
http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
                               http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-3.0.xsd
                               http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd";>




    


    <bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="transacted" value="true"/>
        <property name="transactionManager" ref="jtaTransactionManager"/>
    </bean>

    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
        <property name="transacted" value="true"/>
        <property name="transactionManager" ref="jtaTransactionManager"/>
    </bean>

    <bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean id="jdbc" class="org.apache.camel.component.jdbc.JdbcComponent">
        <property name="dataSource" ref="dataSource"/>
    </bean>


        

    <bean id="atomikosTransactionManager"
class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"
destroy-method="close" >
        <property name="forceShutdown" value="false" />
    </bean>


    <bean id="atomikosUserTransaction"
class="com.atomikos.icatch.jta.UserTransactionImp" >
        <property name="transactionTimeout" value="150" />
    </bean>


    <bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosTransactionManager"
/>
        <property name="userTransaction" ref="atomikosUserTransaction" />
        <property name="allowCustomIsolationLevels" value="true" />
    </bean>


    <bean id="PROPAGATION_REQUIRED"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
        <property name="transactionManager" ref="jtaTransactionManager" />
        <property name="propagationBehaviorName"
value="PROPAGATION_REQUIRED" />
        <property name="transactionTemplate" ref="transactionTemplate" />
    </bean>

    <bean id="transactionTemplate" 
class="org.springframework.transaction.support.TransactionTemplate">
        <property name="transactionManager" ref="jtaTransactionManager" />
        <property name="isolationLevelName" value="ISOLATION_SERIALIZABLE"/>
        
    </bean>


    <bean id="jdbcTemplate"
class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="atomikosConnectionFactory"
/>
    </bean>


    

    <bean id="jmsXaConnectionFactory"
class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="atomikosConnectionFactory"
class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init"
destroy-method="close">
        <property name="uniqueResourceName" value="amq1" />
        <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
        <property name="localTransactionMode" value ="false" />
        
    </bean>


    


    <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
        <property name="uniqueResourceName" value="ds1" />
        <property name="testQuery" value="select 1" />
        <property name="xaDataSource">
            <bean
class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
                <property name="serverName" value="localhost" />
                <property name="portNumber" value="1433" />
                <property name="selectMethod" value="cursor" />
                <property name="databaseName" value="messaging" />
                <property name="user" value="__user__" />
                <property name="password" value="_password__" />
            </bean>
        </property>
    </bean>

</beans>

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

End code snippet:  "src/main/resources/META-INF/spring/resource-context.xml"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++




+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Note 1 (Broke my transaction wrapping):

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


    <bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="transacted" value="true"/>
        <property name="transactionManager" ref="jtaTransactionManager"/>
    </bean>

    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
        <property name="transacted" value="true"/>
        <property name="transactionManager" ref="jtaTransactionManager"/>
    </bean>


+++++++++++++++++++  IS NOT EQUIVALENT TO  +++++++++++++++++++


        <bean id="jmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
       <property name="transactionManager" ref="jtaTransactionManager"/>
       <property name="connectionFactory" ref="atomikosConnectionFactory"/>
       <property name="transacted" value="true"/>
    </bean>


    <bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>



+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Note 2 (Broke my transaction wrapping):

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++



    <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
        <property name="uniqueResourceName" value="ds1" />
        <property name="testQuery" value="select 1" />
        <property name="xaDataSource">
            <bean
class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
                <property name="serverName" value="localhost" />
                <property name="portNumber" value="1433" />
                <property name="selectMethod" value="cursor" />
                <property name="databaseName" value="messaging" />
                <property name="user" value="__user__" />
                <property name="password" value="_password__" />
            </bean>
        </property>
    </bean>

+++++++++++++++++++  IS NOT EQUIVALENT TO  +++++++++++++++++++

    <bean id="atomikosDataSource"
class="com.atomikos.jdbc.AtomikosDataSourceBean">
        <property name="uniqueResourceName" value="ds1" />
        <property name="xaDataSource" ref="dataSource" />
        <property name="testQuery" value="select 1" />
    </bean>


    <bean id="dataSource"
class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
        <property name="serverName" value="localhost" />
        <property name="portNumber" value="1433" />
        <property name="selectMethod" value="cursor" />
                <property name="databaseName" value="messaging" />
                <property name="user" value="__user__" />
                <property name="password" value="_password__" />
    </bean>


        
        
        
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Start code snippet:  "src/test/java/xa/ServiceMixXaRollbackTest.java"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        
        
        
        package test.xa;

import junit.framework.Assert;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.language.XPath;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.test.CamelSpringTestSupport;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;



public class ServiceMixXaRollbackTest extends CamelSpringTestSupport
{
    protected JdbcTemplate jdbc;

    @Override
    @BeforeClass
    public void setUp() throws Exception {
        super.setUp();

        jdbc = context.getRegistry().lookup("jdbcTemplate",
JdbcTemplate.class);

        try
        {
            jdbc.execute( "drop table messaging.my_test_thirdparty" );
        }
        catch ( Exception e )
        {
            // ignore
        }
        jdbc.execute("create table messaging.my_test_thirdparty (
thirdparty_id varchar(10), name varchar(128), created varchar(20),
status_code varchar(3) )");
    }

    @Override
    @AfterClass
    public void tearDown() throws Exception {
        jdbc.execute("drop table messaging.my_test_thirdparty");
    }

    @Override
    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();
        camelContext.addRoutes( createRouteBuilder() );
        return camelContext;
    }



    @Override
    protected AbstractXmlApplicationContext createApplicationContext() {
        return new ClassPathXmlApplicationContext(new
String[]{"META-INF/spring/resource-context.xml"});
    }

    @Test
    public void testXaRollbackAfterDb() throws Exception {

        Assert.assertEquals( 0, jdbc.queryForInt("select count(*) from
my_test_thirdparty"));

        String xml = "<?xml version=\"1.0\"?><thirdparty
id=\"123\"><name>THE TEST
THIRDPARTY</name><date>201110140815</date><code>200</code></thirdparty>";
        template.sendBody("activemq:queue:my_test_thirdparty", xml);

        Thread.sleep(15000);

        Assert.assertEquals( 0, jdbc.queryForInt("select count(*) from
my_test_thirdparty"));

        String dlq =
consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty",
String.class);
        Assert.assertNotNull("Should not lose message", dlq);
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new SpringRouteBuilder() {

            @Override
            public void configure() throws Exception {

                from("activemq:queue:my_test_thirdparty")
                    .transacted()
                    .log("+++ Before Database Call +++")
                    .bean(ServiceMixXaRollbackTest.class, "toSql")
                    .to("jdbc:dataSource")
                    .log("+++ After Database Call +++")
                    .throwException(new IllegalArgumentException("Unexpected
Exception"))
                    ;


            }

        };
    }


    public static String toSql(@XPath("thirdparty/@id") int thirdpartyId,
                        @XPath("thirdparty/name/text()") String name,
                        @XPath("thirdparty/date/text()") long created,
                        @XPath("thirdparty/code/text()") int status_code) {

        if (thirdpartyId <= 0) {
            throw new IllegalArgumentException("ThirdPartyId is invalid, was
" + thirdpartyId);
        }

        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id,
name, created, status_code) VALUES (");
        sb.append("'").append(thirdpartyId).append("', ");
        sb.append("'").append(name).append("', ");
        sb.append("'").append(created).append("', ");
        sb.append("'").append(status_code).append("') ");

        return sb.toString();
    }


}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

End code snippet:  "src/test/java/xa/ServiceMixXaRollbackTest.java"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Start code snippet:  "apache-servicemix-4.3.0/etc/activemq-broker.xml"
++  (Just removed comments from activemq-broker.xml, otherwise the same
source xml as the default ServiceMix installation)

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0";
          
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0";
          
xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0";
           xmlns:amq="http://activemq.apache.org/schema/core";>

    <ext:property-placeholder />

    <broker xmlns="http://activemq.apache.org/schema/core";
brokerName="default" dataDirectory="${karaf.data}/activemq/default"
useShutdownHook="false">
              
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true"
memoryLimit="10mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="10mb">
                  
                                  <deadLetterStrategy>
                                                
                                                <individualDeadLetterStrategy 
queuePrefix="DLQ."
useQueueForQueueMessages="true" />
                                        </deadLetterStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy> 

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="${karaf.data}/activemq/default/kahadb"/>
        </persistenceAdapter>

        <transportConnectors>
            <transportConnector name="openwire"
uri="tcp://localhost:61616"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
        </transportConnectors>

    </broker>

    <bean id="activemqConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">

        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="activemqConnectionFactory"
/>
    </bean>

    <bean id="resourceManager"
class="org.apache.activemq.pool.ActiveMQResourceManager"
init-method="recoverResource">
          <property name="transactionManager" ref="transactionManager" />
          <property name="connectionFactory" ref="activemqConnectionFactory"
/>
          <property name="resourceName" value="activemq.default" />
    </bean>

    <reference id="transactionManager"
interface="javax.transaction.TransactionManager" />

    <service ref="pooledConnectionFactory"
interface="javax.jms.ConnectionFactory">
        <service-properties>
            <entry key="name" value="localhost"/>
        </service-properties>
    </service>

</blueprint>

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

End code snippet:  "apache-servicemix-4.3.0/etc/activemq-broker.xml"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


--
View this message in context: 
http://servicemix.396122.n5.nabble.com/JMS-and-Database-interactions-under-the-same-transactional-context-tp4762819p4940331.html
Sent from the ServiceMix - User mailing list archive at Nabble.com.

Reply via email to