Hi all,
I have my flow as jms consumer-> bean class-> http provider ( calling
external service)
I have my jms consumer defined as below as per suggestion provided. It
worked great with jms->bean pojo class (NO sendsync call to http provider).
When I added http provider component, it is just hanging in the call to
sendSYnc() method. I guess this operation is not being part of the
transaction created from jms endpoint.
I am sure I am doing something fundamentally wrong. Please help
<!-- JMS Consumer -->
<jms:endpoint id="jmsConsumer" service="test:MyConsumerService"
endpoint="jmsEndpoint"
targetService="test:MyBeanConsumerService"
role="consumer" processorName="jca"
defaultMep="http://www.w3.org/2004/08/wsdl/in-only"
connectionFactory="#connectionFactory"
synchronous="true" resourceAdapter="#amqResourceAdapter"
bootstrapContext="#bootstrapContext" rollbackOnError="true">
<jms:activationSpec>
<amqra:activationSpec destination="spq2"
destinationType="javax.jms.Queue"
maximumRedeliveries="2"
initialRedeliveryDelay="10000"
useExponentialBackOff="false" />
</jms:activationSpec>
</jms:endpoint>
<bean id="bootstrapContext"
class="org.jencks.factory.BootstrapContextFactoryBean">
<property name="threadPoolSize" value="15" />
<property name="transactionManager" ref="transactionManager"/>
</bean>
<amqra:resourceAdapter id="amqResourceAdapter"
serverUrl="tcp://localhost:61616" />
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="transactionManager"
class="org.apache.geronimo.transaction.manager.GeronimoTransactionManager">
</bean>
</beans>
Here is the code snippet that I am using to call http provider from smx
bean ( I am using sendSync() method)
client = new ServiceMixClientFacade(context);
QName service = new QName("http://test","TemperatureConverterService1");
NormalizedMessage message = exchange.getMessage("in");
Source content = message.getContent();
String body = (new SourceTransformer()).toString(content);
log.info("SimpleConsumer:input message body is:"+body);
EndpointResolver resolver = client.createResolverForService(service);
boolean transactional =
exchange.getProperty("javax.jbi.transaction.jta") != null;
log.info("given xchange is transactional:"+transactional);
/* the below way of transaction management not working
throws exception as below
Caused by: javax.transaction.InvalidTransactionException:
Specified transaction is already associated with another thread
*/
// TransactionManager tm = (TransactionManager)
context.getTransactionManager();
// tm.begin();
InOut outexchange = client.createInOutExchange(resolver);
Iterator<String> iterator = exchange.getPropertyNames().iterator();
while (iterator.hasNext()) {
String propName = iterator.next();
log.info("RECV:EXCHNG:inside
iterator,propName:"+propName+":"+exchange.getProperty(propName));
log.info("SEND:EXCHNG:inside
iterator,propName:"+propName+":"+outexchange.getProperty(propName));
outexchange.setProperty(propName, exchange.getProperty(propName));
}
NormalizedMessage outmessage = outexchange.getInMessage();
outmessage.setContent(new StringSource(body));
String uri = getURIFromMessage(body);
log.info("uri is:"+uri);
boolean useDynamicUri = true;
if (useDynamicUri) {
outexchange.getInMessage().setProperty(
JbiConstants.HTTP_DESTINATION_URI,
uri);
/* outexchange.getInMessage().setProperty(
JbiConstants.HTTP_DESTINATION_URI,
"http://localhost/axis2/services/EchoService"); */
}
boolean success = false;
boolean good = client.sendSync(outexchange);
log.info("SimpleConsumer:invocation is done");
//Object responseContent = client.request(resolver, null, null, body);
//log.info("SimpleConsumer:response is"+responseContent);
log.info("SimpleConsumer:response is"+good);
log.info("SimpleConsumer:response is"+outexchange.getOutMessage());
log.info("SimpleConsumer:error is"+outexchange.getError());
Fault fault = outexchange.getFault();
String faultbody=null;
if(fault != null){
Source faultContent = fault.getContent();
faultbody = (new SourceTransformer()).toString(faultContent);
log.info("SimpleConsumer:fault is"+faultbody);
}
if(outexchange.getOutMessage() != null){
NormalizedMessage responsemessage = outexchange.getOutMessage();
String responsecontent = (new
SourceTransformer()).toString(responsemessage.getContent());
log.info("SimpleConsumer:response body is:"+responsecontent);
//log.info("response object is:"+responseContent);
success=true;
}
if(success){
exchange.setStatus(ExchangeStatus.DONE);
channel.send(exchange);
//tm.commit();
log.info("transaction committed");
//outexchange.setStatus(ExchangeStatus.DONE);
}
else{
//Exception ex = new Exception ("ERROR:Fault occurred"+faultbody);
Exception ex = new Exception ("ERROR:Fault occurred");
exchange.setError(ex);
exchange.setStatus(ExchangeStatus.ERROR);
channel.send(exchange);
//tm.rollback();
log.info("transaction rollbacked");
//exchange.setStatus(ExchangeStatus.ERROR);
}
Freeman Fang wrote:
>
> Hi,
>
> Could you use the latest apache smx 4.2 snapshot (4.2 will be released
> soon)?
>
> Or use the FUSE ESB 4.2[1]
>
> [1]http://repo.open.iona.com/maven2/org/apache/servicemix/apache-servicemix/4.2.0-fuse-01-00/
>
> Freeman
> On 2010-4-13, at 上午10:10, gnanda wrote:
>
>>
>>
>> Hi,
>>
>> Thank you for your quick response.
>>
>> As per your suggestion I added below lines to my jms consumer
>> xbean.xml
>> file and I am getting an exception now. Here is my xbean.xml file
>>
>> <beans xmlns:jms="http://servicemix.apache.org/jms/1.0"
>> xmlns:test="http://test" xmlns:amq="http://activemq.org/config/1.0
>> "
>> xmlns="http://www.springframework.org/schema/beans"
>> xmlns:xsi="http://http://www.w3.org/2001/XMLSchema-instance"
>>
>> xsi:schemaLocation="http://servicemix.apache.org/jms/1.0
>> http://servicemix.apache.org/schema/servicemix-jms-3.2.3.xsd
>> http://activemq.org/config/1.0
>> http://activemq.apache.org/schema/core/activemq-core-4.1.1.xsd
>> http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
>>
>>
>>
>> <jms:consumer service="test:MyConsumerService" endpoint="jms"
>> targetService="test:MyBeanConsumerService"
>> targetEndpoint="beanendpoint"
>> destinationName="spq2"
>> connectionFactory="#connectionFactory"
>> concurrentConsumers="4" cacheLevel="3"
>> marshaler="#marshaler"
>> transacted="jms" />
>>
>> <!--
>> END SNIPPET: consumer <jms:endpoint
>> service="test:MyConsumerService"
>> endpoint="jms+soap"
>> targetInterfaceName="test:MyConsumerInterface"
>> role="consumer" destinationStyle="queue"
>> jmsProviderDestinationName="queue/A/Soap" soap="true"
>> defaultMep="http://www.w3.org/2004/08/wsdl/in-out"
>> connectionFactory="#connectionFactory" />
>> -->
>> <bean id="connectionFactory"
>> class="org.apache.activemq.ActiveMQConnectionFactory">
>> <property name="brokerURL" value="tcp://localhost:61616" />
>> <!--
>> not working <property name="redeliveryPolicy"
>> ref="redeliveryPolicy"
>> />
>> -->
>> </bean>
>> <bean id="redeliveryPolicy"
>> class="org.apache.activemq.RedeliveryPolicy">
>> <property name="maximumRedeliveries" value="8" />
>> </bean>
>>
>> <bean id="marshaler"
>>
>> class="org.apache.servicemix.jms.endpoints.DefaultConsumerMarshaler">
>> <property name="mep"
>> value="http://www.w3.org/2004/08/wsdl/in-out
>> " />
>> <property name="rollbackOnError" value="true" />
>> </bean>
>>
>>
>>
>> </beans>
>>
>>
>> Exception below:
>> Do I need to start a transaction explicitly somewhere? If yes, how
>> would I
>> start a transaction?
>>
>> 2010-04-12 21:58:49,342| 21:58:49,342 | WARN | tenerContainer-7 |
>> DefaultMessageListenerContainer | AbstractMessageListenerContainer
>> 646 |
>> Execution of JMS message listener failed
>> javax.jms.JMSException: Error sending JBI exchange
>> at
>> org
>> .apache
>> .servicemix
>> .jms
>> .endpoints
>> .AbstractConsumerEndpoint.onMessage(AbstractConsumerEndpoint.java:575)
>> at
>> org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint
>> $1.onMessage(JmsConsumerEndpoint.java:505)
>> at
>> org
>> .springframework
>> .jms
>> .listener
>> .AbstractMessageListenerContainer
>> .doInvokeListener(AbstractMessageListenerContainer.java:518)
>> at
>> org
>> .springframework
>> .jms
>> .listener
>> .AbstractMessageListenerContainer
>> .invokeListener(AbstractMessageListenerContainer.java:479)
>> at
>> org
>> .springframework
>> .jms
>> .listener
>> .AbstractMessageListenerContainer
>> .doExecuteListener(AbstractMessageListenerContainer.java:451)
>> at
>> org
>> .springframework
>> .jms
>> .listener
>> .AbstractPollingMessageListenerContainer
>> .doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
>> at
>> org
>> .springframework
>> .jms
>> .listener
>> .AbstractPollingMessageListenerContainer
>> .receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
>> at
>> org.springframework.jms.listener.DefaultMessageListenerContainer
>> $
>> AsyncMessageListenerInvoker
>> .invokeListener(DefaultMessageListenerContainer.java:982)
>> at
>> org.springframework.jms.listener.DefaultMessageListenerContainer
>> $
>> AsyncMessageListenerInvoker
>> .executeOngoingLoop(DefaultMessageListenerContainer.java:974)
>> at
>> org.springframework.jms.listener.DefaultMessageListenerContainer
>> $
>> AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:
>> 876)
>> at java.lang.Thread.run(Thread.java:619)
>> Caused by: java.lang.Exception: ERROR:Fault occurred<?xml
>> version="1.0"
>> encoding="UTF-8"?><soapenv:Envelope
>> xmlns:soapenv="http://schemas.xmlsoap.org/soap/
>> envelope/"><soapenv:Body><soapenv:Fault
>> xmlns:axis2ns31="http://schemas.xmlsoap.org/soap/
>> envelope/"><faultcode>axis2ns31:Client</faultcode><faultstring>The
>> endpoint reference (EPR) for the Operation not found is
>> http://localhost/axis2/services/EchoService and the WSA Action =
>> null</faultstring><detail/></soapenv:Fault></soapenv:Body></
>> soapenv:Envelope>
>> at
>> org.simpleBeanconsumer.MyBean.onMessageExchange(MyBean.java:147)
>> at
>> org
>> .apache
>> .servicemix.bean.BeanEndpoint.onProviderExchange(BeanEndpoint.java:
>> 230)
>> at
>> org.apache.servicemix.bean.BeanEndpoint.process(BeanEndpoint.java:217)
>> at
>> org
>> .apache
>> .servicemix
>> .common.AsyncBaseLifeCycle.doProcess(AsyncBaseLifeCycle.java:627)
>> at
>> org
>> .apache
>> .servicemix
>> .common.AsyncBaseLifeCycle.processExchange(AsyncBaseLifeCycle.java:
>> 581)
>> at
>> org
>> .apache
>> .servicemix
>> .common
>> .AsyncBaseLifeCycle.processExchangeInTx(AsyncBaseLifeCycle.java:478)
>> at
>> org.apache.servicemix.common.AsyncBaseLifeCycle
>> $2.run(AsyncBaseLifeCycle.java:347)
>> at
>> java.util.concurrent.ThreadPoolExecutor
>> $Worker.runTask(ThreadPoolExecutor.java:886)
>> at
>> java.util.concurrent.ThreadPoolExecutor
>> $Worker.run(ThreadPoolExecutor.java:908)
>> ... 1 more
>> 2010-04-12 21:58:49,342| 21:58:49,342 | ERROR | //127.0.0.1:4933 |
>> Service
>> | ivemq.broker.TransportConnection 290 | Async error occurred:
>> javax.jms.JMSException: Transaction
>> 'TX:ID:LPF004689-4862-1271123912874-0:65:3' has not been started.
>> javax.jms.JMSException: Transaction
>> 'TX:ID:LPF004689-4862-1271123912874-0:65:3' has not been started.
>> at
>> org
>> .apache
>> .activemq
>> .broker.TransactionBroker.getTransaction(TransactionBroker.java:270)
>> at
>> org
>> .apache
>> .activemq
>> .broker.TransactionBroker.acknowledge(TransactionBroker.java:190)
>> at
>> org
>> .apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>> at
>> org
>> .apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>> at
>> org
>> .apache
>> .activemq
>> .broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
>> at
>> org
>> .apache
>> .activemq
>> .broker
>> .TransportConnection.processMessageAck(TransportConnection.java:456)
>> at
>> org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
>> at
>> org
>> .apache
>> .activemq
>> .broker.TransportConnection.service(TransportConnection.java:305)
>> at
>> org.apache.activemq.broker.TransportConnection
>> $1.onCommand(TransportConnection.java:179)
>> at
>> org
>> .apache
>> .activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>> at
>> org
>> .apache
>> .activemq
>> .transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:
>> 143)
>> at
>> org
>> .apache
>> .activemq
>> .transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>> at
>> org
>> .apache
>> .activemq.transport.TransportSupport.doConsume(TransportSupport.java:
>> 84)
>> at
>> org
>> .apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:
>> 203)
>> at
>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
>> 185)
>> at java.lang.Thread.run(Thread.java:619)
>>
>> Thanks & Regards
>> Gita
>>
>>
>>
>>
>>
>>
>> Freeman Fang wrote:
>>>
>>> Hi,
>>>
>>> You can simply add
>>> marshaler="#marshaler"
>>> transacted="jms"
>>> attribute for your jms consumer to enable transaction.
>>> here the marshaler is,
>>> <bean id="marshaler"
>>> class="org.apache.servicemix.jms.endpoints.DefaultConsumerMarshaler">
>>> <property name="mep" value="http://www.w3.org/2004/08/wsdl/in-out
>>> " />
>>> <property name="rollbackOnError" value="true"/>
>>> </bean>
>>> notice you must set <property name="rollbackOnError" value="true"/>
>>>
>>> then when a Error message back to your jms consumer, the jms comsumer
>>> will do rollback.
>>>
>>>
>>> Freeman
>>>
>>>
>>> On 2010-4-10, at 下午12:26, gnanda wrote:
>>>
>>>>
>>>> Hi,
>>>> I am a newbie in servicemix.
>>>> I am using servicemix 4 (apache-servicemix-4.0.0) . I have created a
>>>> service
>>>> assembly with 3 SUs. The flow is as below
>>>> JMS consumer ->servicemix Bean class-> http provider (accessing
>>>> external
>>>> webservice). I like to understand how would I impose transactional
>>>> capability the the above message flow
>>>>
>>>> For example: if I receive any error (or fault) while calling the
>>>> external
>>>> service from http provider component then I should be able to
>>>> rollback or
>>>> commit the message to the queue based one the error code.
>>>>
>>>> Any insight will be very helpful. Thank you in advance.
>>>>
>>>>
>>>> Here is my xbean definition of jms consumer
>>>>
>>>> <jms:consumer service="test:MyConsumerService" endpoint="jms"
>>>> targetService="test:MyBeanConsumerService"
>>>> targetEndpoint="beanendpoint"
>>>> destinationName="spq2" connectionFactory="#connectionFactory"
>>>> concurrentConsumers="4" cacheLevel="3" />
>>>>
>>>> <bean id="connectionFactory"
>>>> class="org.apache.activemq.ActiveMQConnectionFactory">
>>>> <property name="brokerURL" value="tcp://localhost:61616" />
>>>> <!--
>>>> not working <property name="redeliveryPolicy"
>>>> ref="redeliveryPolicy"
>>>> />
>>>> -->
>>>> </bean>
>>>> <bean id="redeliveryPolicy"
>>>> class="org.apache.activemq.RedeliveryPolicy">
>>>> <property name="maximumRedeliveries" value="8" />
>>>> </bean>
>>>> Here is my xbean definition of the bean class
>>>> <bean:endpoint service="test:MyBeanConsumerService"
>>>> endpoint="beanendpoint"
>>>> bean="#myBean"/>
>>>>
>>>> <bean id="myBean" class="org.simpleBeanconsumer.MyBean"/>
>>>>
>>>> </beans>
>>>>
>>>> Here is my xbean definition of the http provider
>>>>
>>>> <http:endpoint service="test:TemperatureConverterService1"
>>>> endpoint="TemperatureConverterServiceSoap12Binding"
>>>> role="provider"
>>>>
>>>> locationURI="http://localhost/axis2/services/TemperatureConverterService
>>>> "
>>>>
>>>> defaultMep="http://www.w3.org/2004/08/wsdl/in-out"
>>>> />
>>>> <!--
>>>> wsdlResource="http://localhost/axis2/services/TemperatureConverterService?wsdl
>>>> "
>>>> -->
>>>> </beans>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/SMX4-handling-transaction-tp28199896p28199896.html
>>>> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>>>>
>>>
>>>
>>> --
>>> Freeman Fang
>>> ------------------------
>>> Open Source SOA: http://fusesource.com
>>>
>>>
>>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/SMX4-handling-transaction-tp28199896p28219212.html
>> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>>
>
>
> --
> Freeman Fang
> ------------------------
> Open Source SOA: http://fusesource.com
>
>
>
--
View this message in context:
http://old.nabble.com/SMX4-handling-transaction-tp28199896p28237095.html
Sent from the ServiceMix - User mailing list archive at Nabble.com.