Title: [836] branches/servicemix-2.0/trunk/core/src/main/java/org/servicemix/jbi/messaging: SM-174 : SendSync does not work on clustered flows
- Revision
- 836
- Author
- gnt
- Date
- 2005-11-14 16:48:17 -0500 (Mon, 14 Nov 2005)
Log Message
SM-174 : SendSync does not work on clustered flows
Modified Paths
Added Paths
Diff
Modified: branches/servicemix-2.0/trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java (835 => 836)
--- branches/servicemix-2.0/trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java 2005-11-14 20:19:41 UTC (rev 835)
+++ branches/servicemix-2.0/trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java 2005-11-14 21:48:17 UTC (rev 836)
@@ -74,6 +74,7 @@
private long lastReceiveTime = System.currentTimeMillis();
private AtomicBoolean closed = new AtomicBoolean(false);
private Map waiters = new ConcurrentHashMap();
+ private Map exchangesById = new ConcurrentHashMap();
/**
* Constructor
@@ -406,12 +407,14 @@
* @throws MessagingException
*/
public boolean sendSync(MessageExchange messageExchange, long timeoutMS) throws MessagingException {
+ boolean result = false;
if (log.isDebugEnabled()) {
log.debug("Sending " + messageExchange.getExchangeId() + " in " + this);
}
// JBI 5.5.2.1.3: set the sendSync property
messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
+ exchangesById.put(messageExchange.getExchangeId(), messageExchange);
autoEnlistInTx(messageExchangeImpl);
try {
// Synchronously send a message and wait for the response
@@ -424,15 +427,20 @@
if (messageExchangeImpl.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
messageExchangeImpl.handleAccept();
resumeTx(messageExchangeImpl);
- return true;
+ result= true;
} else {
// JBI 5.5.2.1.3: the exchange should be set to ERROR status
messageExchangeImpl.getPacket().setAborted(true);
- return false;
+ result = false;
}
} catch (InterruptedException e) {
+ exchangesById.remove(messageExchange.getExchangeId());
throw new MessagingException(e);
}
+ finally{
+ exchangesById.remove(messageExchange.getExchangeId());
+ }
+ return result;
}
/**
@@ -562,11 +570,12 @@
// If the message has been sent synchronously
// this is the answer, so update the syncState and notify the waiter
// Here, we don't need to put the message in the queue
- if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
- suspendTx(me);
- synchronized (me) {
- me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
- me.notify();
+ MessageExchangeImpl theOriginal = (MessageExchangeImpl) exchangesById.get(me.getExchangeId());
+ if (theOriginal != null && theOriginal.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+ suspendTx(theOriginal);
+ synchronized (theOriginal) {
+ theOriginal.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
+ theOriginal.notify();
}
} else {
Component component = ((LocalComponentConnector) componentConnector).getComponent();
Copied: branches/servicemix-2.0/trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java (from rev 835, trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/PingService.java) ( => )
Copied: branches/servicemix-2.0/trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java (from rev 835, trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/SimpleClusterSendSyncTest.java)
Added: branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml (835 => 836)
--- branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml 2005-11-14 20:19:41 UTC (rev 835)
+++ branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/broker.xml 2005-11-14 21:48:17 UTC (rev 836)
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<beans xmlns="http://xbean.org/schemas/spring/1.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xmlns:sm="http://servicemix.org/config/1.0"
+xmlns:foo="http://www.habuma.com/foo"
+xsi:schemaLocation="http://xbean.org/schemas/spring/1.0
+conf/spring-beans.xsd
+http://servicemix.org/config/1.0
+conf/servicemix.xsd">
+
+ <sm:container id="jbi" name ="service" flowName="cluster">
+ <sm:activationSpecs>
+ <sm:activationSpec componentName="pingService"
+service="foo:pingService">
+ <sm:component>
+ <bean class="org.servicemix.jbi.nmr.flow.jms.PingService"/>
+ </sm:component>
+ </sm:activationSpec>
+ </sm:activationSpecs>
+ </sm:container>
+</beans>
\ No newline at end of file
Added: branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml (835 => 836)
--- branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml 2005-11-14 20:19:41 UTC (rev 835)
+++ branches/servicemix-2.0/trunk/core/src/test/resources/org/servicemix/jbi/nmr/flow/jms/client.xml 2005-11-14 21:48:17 UTC (rev 836)
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<beans xmlns="http://xbean.org/schemas/spring/1.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xmlns:sm="http://servicemix.org/config/1.0"
+xsi:schemaLocation="http://xbean.org/schemas/spring/1.0
+conf/spring-beans.xsd
+http://servicemix.org/config/1.0
+conf/servicemix.xsd">
+
+ <sm:container id="jbi"
+ flowName="cluster"
+ name="jbi"/>
+
+ <bean id="client"
+ class="org.servicemix.client.DefaultServiceMixClient">
+ <constructor-arg ref="jbi" />
+ </bean>
+</beans>
\ No newline at end of file