Diff
Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java (834 => 835)
--- trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java 2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java 2005-11-14 20:19:41 UTC (rev 835)
@@ -74,6 +74,7 @@
private long lastReceiveTime = System.currentTimeMillis();
private AtomicBoolean closed = new AtomicBoolean(false);
private Map waiters = new ConcurrentHashMap();
+ private Map exchangesById = new ConcurrentHashMap();
/**
* Constructor
@@ -408,12 +409,14 @@
* @throws MessagingException
*/
public boolean sendSync(MessageExchange messageExchange, long timeoutMS) throws MessagingException {
+ boolean result = false;
if (log.isDebugEnabled()) {
log.debug("Sending " + messageExchange.getExchangeId() + " in " + this);
}
// JBI 5.5.2.1.3: set the sendSync property
messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
+ exchangesById.put(messageExchange.getExchangeId(), messageExchange);
autoEnlistInTx(messageExchangeImpl);
try {
// Synchronously send a message and wait for the response
@@ -426,15 +429,20 @@
if (messageExchangeImpl.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
messageExchangeImpl.handleAccept();
resumeTx(messageExchangeImpl);
- return true;
+ result= true;
} else {
// JBI 5.5.2.1.3: the exchange should be set to ERROR status
messageExchangeImpl.getPacket().setAborted(true);
- return false;
+ result = false;
}
} catch (InterruptedException e) {
+ exchangesById.remove(messageExchange.getExchangeId());
throw new MessagingException(e);
}
+ finally{
+ exchangesById.remove(messageExchange.getExchangeId());
+ }
+ return result;
}
/**
@@ -564,11 +572,12 @@
// If the message has been sent synchronously
// this is the answer, so update the syncState and notify the waiter
// Here, we don't need to put the message in the queue
- if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
- suspendTx(me);
- synchronized (me) {
- me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
- me.notify();
+ MessageExchangeImpl theOriginal = (MessageExchangeImpl) exchangesById.get(me.getExchangeId());
+ if (theOriginal != null && theOriginal.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+ suspendTx(theOriginal);
+ synchronized (theOriginal) {
+ theOriginal.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
+ theOriginal.notify();
}
} else {
Component component = ((LocalComponentConnector) componentConnector).getComponent();
Added: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java (834 => 835)
--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java 2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java 2005-11-14 20:19:41 UTC (rev 835)
@@ -0,0 +1,36 @@
+/**
+ * <a href="" The open source ESB</a>
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ */
+package org.servicemix.jbi.nmr.flow.jms;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import org.servicemix.MessageExchangeListener;
+import org.servicemix.components.util.ComponentSupport;
+import org.servicemix.jbi.jaxp.StringSource;
+
+/**
+ * Test service from SM-174 - (Craig Wall orginal author)
+ */
+public class PingService extends ComponentSupport implements MessageExchangeListener{
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException{
+ System.out.println("GOT A MESSAGE; exchange.status="+exchange.getStatus());
+ NormalizedMessage out=exchange.createMessage();
+ out.setContent(new StringSource("<response>Ping back at ya!</response>"));
+ System.out.println("SENDING RESPONSE; exchange.status="+exchange.getStatus());
+ answer(exchange,out);
+ System.out.println("RESPONSE SENT; exchange.status="+exchange.getStatus());
+ }
+}
\ No newline at end of file
Added: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java (834 => 835)
--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java 2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java 2005-11-14 20:19:41 UTC (rev 835)
@@ -0,0 +1,66 @@
+/**
+ * <a href="" The open source ESB</a>
+ *
+ * Copyright 2005 RAJD Consultancy Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ */
+package org.servicemix.jbi.nmr.flow.jms;
+
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+import junit.framework.TestCase;
+import org.servicemix.client.ServiceMixClient;
+import org.servicemix.jbi.container.SpringJBIContainer;
+import org.servicemix.jbi.jaxp.StringSource;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.xbean.spring.context.ClassPathXmlApplicationContext;
+/**
+ *
+ * JMSCluster Test for SendSync
+ */
+public class SimpleClusterSendSyncTest extends TestCase{
+ protected SpringJBIContainer jbi;
+
+ /*
+ * @see TestCase#setUp()
+ */
+ protected void setUp() throws Exception{
+ super.setUp();
+ AbstractXmlApplicationContext context=new ClassPathXmlApplicationContext(
+ "org/servicemix/jbi/nmr/flow/jms/broker.xml");
+ jbi=(SpringJBIContainer) context.getBean("jbi");
+ jbi.init();
+ jbi.start();
+ assertNotNull("JBI Container not found in spring!",jbi);
+
+ }
+
+ protected void tearDown() throws Exception{
+ super.tearDown();
+ }
+
+ public void testSendSync() throws Exception{
+ ApplicationContext ctx=new ClassPathXmlApplicationContext("org/servicemix/jbi/nmr/flow/jms/client.xml");
+ ServiceMixClient client=(ServiceMixClient) ctx.getBean("client");
+ Thread.sleep(5000);
+ InOut exchange=client.createInOutExchange();
+ exchange.setService(new QName("http://www.habuma.com/foo","pingService"));
+ NormalizedMessage in=exchange.getInMessage();
+ in.setContent(new StringSource("<ping>Pinging you</ping>"));
+ System.out.println("SENDING; exchange.status="+exchange.getStatus());
+ client.sendSync(exchange);
+ System.out.println("GOT RESPONSE; exchange.status="+exchange.getStatus());
+ client.done(exchange);
+ }
+}
Added: trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml (834 => 835)
--- trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml 2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml 2005-11-14 20:19:41 UTC (rev 835)
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<beans xmlns="http://xbean.org/schemas/spring/1.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xmlns:sm="http://servicemix.org/config/1.0"
+xmlns:foo="http://www.habuma.com/foo"
+xsi:schemaLocation="http://xbean.org/schemas/spring/1.0
+conf/spring-beans.xsd
+http://servicemix.org/config/1.0
+conf/servicemix.xsd">
+
+ <sm:container id="jbi" name ="service" flowName="cluster">
+ <sm:activationSpecs>
+ <sm:activationSpec componentName="pingService"
+service="foo:pingService">
+ <sm:component>
+ <bean class="org.servicemix.jbi.nmr.flow.jms.PingService"/>
+ </sm:component>
+ </sm:activationSpec>
+ </sm:activationSpecs>
+ </sm:container>
+</beans>
\ No newline at end of file
Added: trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml (834 => 835)
--- trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml 2005-11-14 20:18:25 UTC (rev 834)
+++ trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml 2005-11-14 20:19:41 UTC (rev 835)
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<beans xmlns="http://xbean.org/schemas/spring/1.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xmlns:sm="http://servicemix.org/config/1.0"
+xsi:schemaLocation="http://xbean.org/schemas/spring/1.0
+conf/spring-beans.xsd
+http://servicemix.org/config/1.0
+conf/servicemix.xsd">
+
+ <sm:container id="jbi"
+ flowName="cluster"
+ name="jbi"/>
+
+ <bean id="client"
+ class="org.servicemix.client.DefaultServiceMixClient">
+ <constructor-arg ref="jbi" />
+ </bean>
+</beans>
\ No newline at end of file