Title: [835] trunk/core/src/main/java/org/servicemix/jbi/messaging: sendSync test for the cluster

Diff

Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java (834 => 835)

--- trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java	2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java	2005-11-14 20:19:41 UTC (rev 835)
@@ -74,6 +74,7 @@
     private long lastReceiveTime = System.currentTimeMillis();
     private AtomicBoolean closed = new AtomicBoolean(false);
     private Map waiters = new ConcurrentHashMap();
+    private Map exchangesById = new ConcurrentHashMap();
 
     /**
      * Constructor
@@ -408,12 +409,14 @@
      * @throws MessagingException
      */
     public boolean sendSync(MessageExchange messageExchange, long timeoutMS) throws MessagingException {
+        boolean result = false;
         if (log.isDebugEnabled()) {
             log.debug("Sending " + messageExchange.getExchangeId() + " in " + this);
         }
         // JBI 5.5.2.1.3: set the sendSync property
         messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
     	MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
+        exchangesById.put(messageExchange.getExchangeId(), messageExchange);
     	autoEnlistInTx(messageExchangeImpl);
         try {
             // Synchronously send a message and wait for the response
@@ -426,15 +429,20 @@
             if (messageExchangeImpl.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
                 messageExchangeImpl.handleAccept();
                 resumeTx(messageExchangeImpl);
-                return true;
+                result= true;
             } else {
                 // JBI 5.5.2.1.3: the exchange should be set to ERROR status
                 messageExchangeImpl.getPacket().setAborted(true);
-                return false;
+                result =  false;
             }
         } catch (InterruptedException e) {
+            exchangesById.remove(messageExchange.getExchangeId());
             throw new MessagingException(e);
         }
+        finally{
+            exchangesById.remove(messageExchange.getExchangeId());
+        }
+        return result;
     }
 
     /**
@@ -564,11 +572,12 @@
         // If the message has been sent synchronously
         // this is the answer, so update the syncState and notify the waiter
         // Here, we don't need to put the message in the queue
-        if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
-            suspendTx(me);
-            synchronized (me) {
-                me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
-                me.notify();
+        MessageExchangeImpl theOriginal = (MessageExchangeImpl) exchangesById.get(me.getExchangeId());
+        if (theOriginal != null && theOriginal.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+            suspendTx(theOriginal);
+            synchronized (theOriginal) {
+                theOriginal.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
+                theOriginal.notify();
             }
         } else {
             Component component = ((LocalComponentConnector) componentConnector).getComponent();

Added: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java (834 => 835)

--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java	2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java	2005-11-14 20:19:41 UTC (rev 835)
@@ -0,0 +1,36 @@
+/**
+ * <a href="" The open source ESB</a>
+ * 
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ * 
+ */
+package org.servicemix.jbi.nmr.flow.jms;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import org.servicemix.MessageExchangeListener;
+import org.servicemix.components.util.ComponentSupport;
+import org.servicemix.jbi.jaxp.StringSource;
+
+/**
+ * Test service from SM-174 - (Craig Wall orginal author)
+ */
+public class PingService extends ComponentSupport implements MessageExchangeListener{
+    public void onMessageExchange(MessageExchange exchange) throws MessagingException{
+        System.out.println("GOT A MESSAGE; exchange.status="+exchange.getStatus());
+        NormalizedMessage out=exchange.createMessage();
+        out.setContent(new StringSource("<response>Ping back at ya!</response>"));
+        System.out.println("SENDING RESPONSE; exchange.status="+exchange.getStatus());
+        answer(exchange,out);
+        System.out.println("RESPONSE SENT; exchange.status="+exchange.getStatus());
+    }
+}
\ No newline at end of file

Added: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java (834 => 835)

--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java	2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java	2005-11-14 20:19:41 UTC (rev 835)
@@ -0,0 +1,66 @@
+/**
+ * <a href="" The open source ESB</a>
+ * 
+ * Copyright 2005 RAJD Consultancy Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ * 
+ */
+package org.servicemix.jbi.nmr.flow.jms;
+
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+import junit.framework.TestCase;
+import org.servicemix.client.ServiceMixClient;
+import org.servicemix.jbi.container.SpringJBIContainer;
+import org.servicemix.jbi.jaxp.StringSource;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.xbean.spring.context.ClassPathXmlApplicationContext;
+/**
+ * 
+ * JMSCluster Test for SendSync
+ */
+public class SimpleClusterSendSyncTest extends TestCase{
+    protected SpringJBIContainer jbi;
+
+    /*
+     * @see TestCase#setUp()
+     */
+    protected void setUp() throws Exception{
+        super.setUp();
+        AbstractXmlApplicationContext context=new ClassPathXmlApplicationContext(
+                        "org/servicemix/jbi/nmr/flow/jms/broker.xml");
+        jbi=(SpringJBIContainer) context.getBean("jbi");
+        jbi.init();
+        jbi.start();
+        assertNotNull("JBI Container not found in spring!",jbi);
+        
+    }
+
+    protected void tearDown() throws Exception{
+        super.tearDown();
+    }
+
+    public void testSendSync() throws Exception{
+        ApplicationContext ctx=new ClassPathXmlApplicationContext("org/servicemix/jbi/nmr/flow/jms/client.xml");
+        ServiceMixClient client=(ServiceMixClient) ctx.getBean("client");
+        Thread.sleep(5000);
+        InOut exchange=client.createInOutExchange();
+        exchange.setService(new QName("http://www.habuma.com/foo","pingService"));
+        NormalizedMessage in=exchange.getInMessage();
+        in.setContent(new StringSource("<ping>Pinging you</ping>"));
+        System.out.println("SENDING; exchange.status="+exchange.getStatus());
+        client.sendSync(exchange);
+        System.out.println("GOT RESPONSE; exchange.status="+exchange.getStatus());
+        client.done(exchange);
+    }
+}

Added: trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml (834 => 835)

--- trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml	2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml	2005-11-14 20:19:41 UTC (rev 835)
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?> 
+
+<beans xmlns="http://xbean.org/schemas/spring/1.0" 
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+xmlns:sm="http://servicemix.org/config/1.0" 
+xmlns:foo="http://www.habuma.com/foo" 
+xsi:schemaLocation="http://xbean.org/schemas/spring/1.0 
+conf/spring-beans.xsd 
+http://servicemix.org/config/1.0 
+conf/servicemix.xsd"> 
+
+  <sm:container id="jbi" name ="service" flowName="cluster"> 
+    <sm:activationSpecs> 
+      <sm:activationSpec componentName="pingService" 
+service="foo:pingService"> 
+        <sm:component> 
+          <bean class="org.servicemix.jbi.nmr.flow.jms.PingService"/> 
+        </sm:component> 
+      </sm:activationSpec> 
+    </sm:activationSpecs> 
+  </sm:container> 
+</beans> 
\ No newline at end of file

Added: trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml (834 => 835)

--- trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml	2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml	2005-11-14 20:19:41 UTC (rev 835)
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?> 
+
+<beans xmlns="http://xbean.org/schemas/spring/1.0" 
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+xmlns:sm="http://servicemix.org/config/1.0" 
+xsi:schemaLocation="http://xbean.org/schemas/spring/1.0 
+conf/spring-beans.xsd 
+http://servicemix.org/config/1.0 
+conf/servicemix.xsd"> 
+
+  <sm:container id="jbi" 
+      flowName="cluster" 
+      name="jbi"/> 
+
+  <bean id="client" 
+      class="org.servicemix.client.DefaultServiceMixClient"> 
+    <constructor-arg ref="jbi" /> 
+  </bean> 
+</beans> 
\ No newline at end of file

Reply via email to