Title: [836] branches/servicemix-2.0/trunk/core/src/main/java/org/servicemix/jbi/messaging: SM-174 : SendSync does not work on clustered flows

Diff

Modified: branches/servicemix-2.0/trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java (835 => 836)

--- branches/servicemix-2.0/trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java	2005-11-14 20:19:41 UTC (rev 835)
+++ branches/servicemix-2.0/trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java	2005-11-14 21:48:17 UTC (rev 836)
@@ -74,6 +74,7 @@
     private long lastReceiveTime = System.currentTimeMillis();
     private AtomicBoolean closed = new AtomicBoolean(false);
     private Map waiters = new ConcurrentHashMap();
+    private Map exchangesById = new ConcurrentHashMap();
 
     /**
      * Constructor
@@ -406,12 +407,14 @@
      * @throws MessagingException
      */
     public boolean sendSync(MessageExchange messageExchange, long timeoutMS) throws MessagingException {
+        boolean result = false;
         if (log.isDebugEnabled()) {
             log.debug("Sending " + messageExchange.getExchangeId() + " in " + this);
         }
         // JBI 5.5.2.1.3: set the sendSync property
         messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
     	MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
+        exchangesById.put(messageExchange.getExchangeId(), messageExchange);
     	autoEnlistInTx(messageExchangeImpl);
         try {
             // Synchronously send a message and wait for the response
@@ -424,15 +427,20 @@
             if (messageExchangeImpl.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
                 messageExchangeImpl.handleAccept();
                 resumeTx(messageExchangeImpl);
-                return true;
+                result= true;
             } else {
                 // JBI 5.5.2.1.3: the exchange should be set to ERROR status
                 messageExchangeImpl.getPacket().setAborted(true);
-                return false;
+                result =  false;
             }
         } catch (InterruptedException e) {
+            exchangesById.remove(messageExchange.getExchangeId());
             throw new MessagingException(e);
         }
+        finally{
+            exchangesById.remove(messageExchange.getExchangeId());
+        }
+        return result;
     }
 
     /**
@@ -562,11 +570,12 @@
         // If the message has been sent synchronously
         // this is the answer, so update the syncState and notify the waiter
         // Here, we don't need to put the message in the queue
-        if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
-            suspendTx(me);
-            synchronized (me) {
-                me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
-                me.notify();
+        MessageExchangeImpl theOriginal = (MessageExchangeImpl) exchangesById.get(me.getExchangeId());
+        if (theOriginal != null && theOriginal.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+            suspendTx(theOriginal);
+            synchronized (theOriginal) {
+                theOriginal.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
+                theOriginal.notify();
             }
         } else {
             Component component = ((LocalComponentConnector) componentConnector).getComponent();

Copied: branches/servicemix-2.0/trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java (from rev 835, trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java) ( => )

Copied: branches/servicemix-2.0/trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java (from rev 835, trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java)

Added: branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml (835 => 836)

--- branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml	2005-11-14 20:19:41 UTC (rev 835)
+++ branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml	2005-11-14 21:48:17 UTC (rev 836)
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?> 
+
+<beans xmlns="http://xbean.org/schemas/spring/1.0" 
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+xmlns:sm="http://servicemix.org/config/1.0" 
+xmlns:foo="http://www.habuma.com/foo" 
+xsi:schemaLocation="http://xbean.org/schemas/spring/1.0 
+conf/spring-beans.xsd 
+http://servicemix.org/config/1.0 
+conf/servicemix.xsd"> 
+
+  <sm:container id="jbi" name ="service" flowName="cluster"> 
+    <sm:activationSpecs> 
+      <sm:activationSpec componentName="pingService" 
+service="foo:pingService"> 
+        <sm:component> 
+          <bean class="org.servicemix.jbi.nmr.flow.jms.PingService"/> 
+        </sm:component> 
+      </sm:activationSpec> 
+    </sm:activationSpecs> 
+  </sm:container> 
+</beans> 
\ No newline at end of file

Added: branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml (835 => 836)

--- branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml	2005-11-14 20:19:41 UTC (rev 835)
+++ branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml	2005-11-14 21:48:17 UTC (rev 836)
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?> 
+
+<beans xmlns="http://xbean.org/schemas/spring/1.0" 
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+xmlns:sm="http://servicemix.org/config/1.0" 
+xsi:schemaLocation="http://xbean.org/schemas/spring/1.0 
+conf/spring-beans.xsd 
+http://servicemix.org/config/1.0 
+conf/servicemix.xsd"> 
+
+  <sm:container id="jbi" 
+      flowName="cluster" 
+      name="jbi"/> 
+
+  <bean id="client" 
+      class="org.servicemix.client.DefaultServiceMixClient"> 
+    <constructor-arg ref="jbi" /> 
+  </bean> 
+</beans> 
\ No newline at end of file

Reply via email to