Title: [990] trunk/core/src/main/java/org/servicemix/components/util: Fix the JMS Flow deadlock when using sendSync.
- Revision
- 990
- Author
- gnt
- Date
- 2005-12-02 11:39:51 -0500 (Fri, 02 Dec 2005)
Log Message
Fix the JMS Flow deadlock when using sendSync.
The ChainedComponent now report back errors.
Modified Paths
Diff
Modified: trunk/core/src/main/java/org/servicemix/components/util/ChainedComponent.java (989 => 990)
--- trunk/core/src/main/java/org/servicemix/components/util/ChainedComponent.java 2005-12-02 14:19:43 UTC (rev 989)
+++ trunk/core/src/main/java/org/servicemix/components/util/ChainedComponent.java 2005-12-02 16:39:51 UTC (rev 990)
@@ -20,12 +20,15 @@
import java.util.Iterator;
import java.util.Set;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
import org.servicemix.components.util.TransformComponentSupport;
+import org.servicemix.jbi.FaultException;
/**
* This class allows a series of componeents to be chained together. It will
@@ -47,7 +50,7 @@
NormalizedMessage curIn = in;
MessageExchange curExchange = exchange;
for (int i = 0; i < services.length; i++) {
- MessageExchange mexchange = this.getDeliveryChannel()
+ InOut mexchange = this.getDeliveryChannel()
.createExchangeFactoryForService(services[i])
.createInOutExchange();
copyProperties(curExchange, mexchange);
@@ -68,7 +71,7 @@
* @return the out message of the invoked service
* @throws MessagingException
*/
- private NormalizedMessage invokeService(MessageExchange exchange,
+ private NormalizedMessage invokeService(InOut exchange,
NormalizedMessage in,
QName service) throws MessagingException {
NormalizedMessage msg = exchange.createMessage();
@@ -76,7 +79,22 @@
exchange.setMessage(msg, "in");
boolean result = this.getDeliveryChannel().sendSync(exchange);
if (result) {
- return exchange.getMessage("out");
+ if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ exchange.setStatus(ExchangeStatus.DONE);
+ getDeliveryChannel().send(exchange);
+ if (exchange.getError() != null) {
+ throw new MessagingException("Received error", exchange.getError());
+ } else if (exchange.getFault() != null) {
+ throw new FaultException("Received fault", exchange, exchange.getFault());
+ } else {
+ throw new MessagingException("Received unknown error");
+ }
+ } else {
+ NormalizedMessage out = exchange.getOutMessage();
+ exchange.setStatus(ExchangeStatus.DONE);
+ getDeliveryChannel().send(exchange);
+ return out;
+ }
}
throw new MessagingException("Could not invoke service: " + service);
}
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (989 => 990)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-12-02 14:19:43 UTC (rev 989)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-12-02 16:39:51 UTC (rev 990)
@@ -52,6 +52,8 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
import java.util.Iterator;
import java.util.Map;
@@ -415,8 +417,22 @@
processInBoundPacket(containerName, event);
}
else if (obj instanceof MessageExchangeImpl) {
- MessageExchangeImpl me = (MessageExchangeImpl) obj;
- super.doRouting(me);
+ final MessageExchangeImpl me = (MessageExchangeImpl) obj;
+ // Dispatch the message in another thread so as to free the jms session
+ // else if a component do a sendSync into the jms flow, the whole
+ // flow is deadlocked
+ broker.getWorkManager().scheduleWork(new Work() {
+ public void release() {
+ }
+ public void run() {
+ try {
+ JMSFlow.super.doRouting(me);
+ }
+ catch (MessagingException e) {
+ log.error("Caught an exception routing ExchangePacket: ", e);
+ }
+ }
+ });
}
}
}
@@ -424,7 +440,7 @@
catch (JMSException jmsEx) {
log.error("Caught an exception unpacking JMS Message: ", jmsEx);
}
- catch (MessagingException e) {
+ catch (WorkException e) {
log.error("Caught an exception routing ExchangePacket: ", e);
}
}