Hello, I try to implement an ActiveMQ transactional client
(http://camel.apache.org/transactional-client.html). If during a process an
exception is occurred the message have to store in ActiveMQ queue:
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spring.SpringTestSupport;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class JMXTransactionTest extends SpringTestSupport {
public void testUnprocessedMessage() throws Exception {
MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
String message = "A brocken message";
resultEndpoint.expectedMessageCount(0);
sendBody("activemq:queue:aaa", message);
resultEndpoint.assertIsSatisfied(1000);
PollingConsumer consumer =
context.getEndpoint("activemq:queue:aaa").createPollingConsumer();
assertEquals(message,
consumer.receive(1000).getIn().getBody(String.class));
}
protected AbstractXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext("camel-context-test.xml");
}
protected int getExpectedRouteCount() {
return 0;
}
@Override protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override public void configure() throws Exception {
SpringTransactionPolicy policy =
(SpringTransactionPolicy)
applicationContext.getBean("PROPAGATION_REQUIRED");
TransactionErrorHandlerBuilder answer = new
TransactionErrorHandlerBuilder();
answer.setTransactionTemplate(policy.getTemplate());
errorHandler(answer);
from("activemq:queue:aaa")
.policy(policy)
.process(new Processor() {
@Override public void process(Exchange
exchange) throws Exception {
throw new
IllegalArgumentException();
}
})
.to("mock:result");
}
};
}
}
and camel-context-test.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
<bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="vm://localhost:61616?broker.persistent=false&broker.useJmx=false"/>
</bean>
<bean id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="transactionManager" ref="jmsTransactionManager" />
<property name="transacted" value="true" />
</bean>
<bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<bean id="PROPAGATION_REQUIRED"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
</beans>
but test failed, because message isn't rollback, the "activemq:queue:aaa" is
empty
(context.getEndpoint("activemq:queue:aaa").createPollingConsumer().receive(1000)
is null):
java.lang.NullPointerException
at JMXTransactionTest.testUnprocessedMessage(JMXTransactionTest.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at junit.framework.TestCase.runTest(TestCase.java:168)
at junit.framework.TestCase.runBare(TestCase.java:134)
at junit.framework.TestResult$1.protect(TestResult.java:110)
at junit.framework.TestResult.runProtected(TestResult.java:128)
at junit.framework.TestResult.run(TestResult.java:113)
at junit.framework.TestCase.run(TestCase.java:124)
at junit.framework.TestSuite.runTest(TestSuite.java:232)
at junit.framework.TestSuite.run(TestSuite.java:227)
at
org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:83)
at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:46)
at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
What do I do wrong?
--
View this message in context:
http://www.nabble.com/JMS-message-isn%27t-rollback-in-transaction-tp25666171p25666171.html
Sent from the Camel - Users mailing list archive at Nabble.com.