Title: [990] trunk/core/src/main/java/org/servicemix/components/util: Fix the JMS Flow deadlock when using sendSync.
Revision
990
Author
gnt
Date
2005-12-02 11:39:51 -0500 (Fri, 02 Dec 2005)

Log Message

Fix the JMS Flow deadlock when using sendSync.
The ChainedComponent now report back errors.

Modified Paths

Diff

Modified: trunk/core/src/main/java/org/servicemix/components/util/ChainedComponent.java (989 => 990)

--- trunk/core/src/main/java/org/servicemix/components/util/ChainedComponent.java	2005-12-02 14:19:43 UTC (rev 989)
+++ trunk/core/src/main/java/org/servicemix/components/util/ChainedComponent.java	2005-12-02 16:39:51 UTC (rev 990)
@@ -20,12 +20,15 @@
 import java.util.Iterator;
 import java.util.Set;
 
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.xml.namespace.QName;
 
 import org.servicemix.components.util.TransformComponentSupport;
+import org.servicemix.jbi.FaultException;
 
 /**
  * This class allows a series of componeents to be chained together. It will
@@ -47,7 +50,7 @@
         NormalizedMessage curIn = in;
         MessageExchange curExchange = exchange;
         for (int i = 0; i < services.length; i++) {
-            MessageExchange mexchange = this.getDeliveryChannel()
+            InOut mexchange = this.getDeliveryChannel()
                     .createExchangeFactoryForService(services[i])
                     .createInOutExchange();
             copyProperties(curExchange, mexchange);
@@ -68,7 +71,7 @@
      * @return the out message of the invoked service
      * @throws MessagingException
      */
-    private NormalizedMessage invokeService(MessageExchange exchange,
+    private NormalizedMessage invokeService(InOut exchange,
                                             NormalizedMessage in, 
                                             QName service) throws MessagingException {
         NormalizedMessage msg = exchange.createMessage();
@@ -76,7 +79,22 @@
         exchange.setMessage(msg, "in");
         boolean result = this.getDeliveryChannel().sendSync(exchange);
         if (result) {
-            return exchange.getMessage("out");
+            if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                exchange.setStatus(ExchangeStatus.DONE);
+                getDeliveryChannel().send(exchange);
+                if (exchange.getError() != null) {
+                    throw new MessagingException("Received error", exchange.getError());
+                } else if (exchange.getFault() != null) {
+                    throw new FaultException("Received fault", exchange, exchange.getFault());
+                } else {
+                    throw new MessagingException("Received unknown error");
+                }
+            } else {
+                NormalizedMessage out = exchange.getOutMessage();
+                exchange.setStatus(ExchangeStatus.DONE);
+                getDeliveryChannel().send(exchange);
+                return out; 
+            }
         }
         throw new MessagingException("Could not invoke service: " + service);
     }

Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (989 => 990)

--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java	2005-12-02 14:19:43 UTC (rev 989)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java	2005-12-02 16:39:51 UTC (rev 990)
@@ -52,6 +52,8 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
 
 import java.util.Iterator;
 import java.util.Map;
@@ -415,8 +417,22 @@
                         processInBoundPacket(containerName, event);
                     }
                     else if (obj instanceof MessageExchangeImpl) {
-                        MessageExchangeImpl me = (MessageExchangeImpl) obj;
-                        super.doRouting(me);
+                        final MessageExchangeImpl me = (MessageExchangeImpl) obj;
+                        // Dispatch the message in another thread so as to free the jms session
+                        // else if a component do a sendSync into the jms flow, the whole
+                        // flow is deadlocked 
+                        broker.getWorkManager().scheduleWork(new Work() {
+                            public void release() {
+                            }
+                            public void run() {
+                                try {
+                                    JMSFlow.super.doRouting(me);
+                                }
+                                catch (MessagingException e) {
+                                    log.error("Caught an exception routing ExchangePacket: ", e);
+                                }
+                            }
+                        });
                     }
                 }
             }
@@ -424,7 +440,7 @@
         catch (JMSException jmsEx) {
             log.error("Caught an exception unpacking JMS Message: ", jmsEx);
         }
-        catch (MessagingException e) {
+        catch (WorkException e) {
             log.error("Caught an exception routing ExchangePacket: ", e);
         }
     }

Reply via email to