Diff
Added: trunk/core/src/main/java/org/servicemix/jbi/ExchangeTimeoutException.java (698 => 699)
--- trunk/core/src/main/java/org/servicemix/jbi/ExchangeTimeoutException.java 2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/main/java/org/servicemix/jbi/ExchangeTimeoutException.java 2005-10-28 14:47:31 UTC (rev 699)
@@ -0,0 +1,43 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
+
+/**
+ * An exception thrown when a synchronous exchange has timed out.
+ *
+ * @version $Revision: 657 $
+ */
+public class ExchangeTimeoutException extends MessagingException {
+ private MessageExchange exchange;
+
+ public ExchangeTimeoutException(MessageExchange exchange) {
+ super("Exchange has timed out: " + exchange);
+ this.exchange = exchange;
+ }
+
+ /**
+ * Returns th exchange
+ */
+ public MessageExchange getExchange() {
+ return exchange;
+ }
+}
Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java (698 => 699)
--- trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java 2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java 2005-10-28 14:47:31 UTC (rev 699)
@@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.servicemix.JbiConstants;
import org.servicemix.MessageExchangeListener;
+import org.servicemix.jbi.ExchangeTimeoutException;
import org.servicemix.jbi.container.ActivationSpec;
import org.servicemix.jbi.container.JBIContainer;
import org.servicemix.jbi.framework.ComponentConnector;
@@ -235,8 +236,14 @@
try {
MessageExchangeImpl me = (MessageExchangeImpl) queue.poll(timeoutMS);
if (me != null) {
- resumeTx(me);
- me.handleAccept();
+ // If the exchange has already timed out,
+ // do not give it to the component
+ if (me.getPacket().isTimedOut()) {
+ me = null;
+ } else {
+ resumeTx(me);
+ me.handleAccept();
+ }
}
return me;
}
@@ -246,6 +253,10 @@
}
protected void doSend(MessageExchangeImpl messageExchange, boolean sync) throws MessagingException {
+ // If the message has timed out
+ if (messageExchange.getPacket().isTimedOut()) {
+ throw new ExchangeTimeoutException(messageExchange);
+ }
// Auto enlist exchange in transaction
autoEnlistInTx(messageExchange);
// Update persistence info
@@ -304,6 +315,7 @@
if (this.closed.get()) {
throw new MessagingException("DeliveryChannel is closed");
}
+ messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
doSend(messageExchangeImpl, false);
}
@@ -349,8 +361,7 @@
return true;
} else {
// JBI 5.5.2.1.3: the exchange should be set to ERROR status
- // TODO: messageExchangeImpl.handleAccept(); ?
- messageExchangeImpl.setStatus(ExchangeStatus.ERROR);
+ messageExchangeImpl.getPacket().setTimedOut(true);
return false;
}
} catch (InterruptedException e) {
Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java (698 => 699)
--- trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java 2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java 2005-10-28 14:47:31 UTC (rev 699)
@@ -62,6 +62,7 @@
private transient Transaction transactionContext;
private transient String endpointName;
private Boolean persistent;
+ private boolean timedOut;
public ExchangePacket() {
@@ -411,4 +412,12 @@
this.persistent = persistent;
}
+ public boolean isTimedOut() {
+ return timedOut;
+ }
+
+ public void setTimedOut(boolean timedOut) {
+ this.timedOut = timedOut;
+ }
+
}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java (698 => 699)
--- trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java 2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java 2005-10-28 14:47:31 UTC (rev 699)
@@ -160,6 +160,9 @@
* @return the processing status of the exchange
*/
public ExchangeStatus getStatus() {
+ if (this.packet.isTimedOut()) {
+ return ExchangeStatus.ERROR;
+ }
return this.packet.getStatus();
}
@@ -517,7 +520,7 @@
out.write(mirror.state);
out.writeBoolean(can(CAN_PROVIDER));
}
-
+
public void handleSend(boolean sync) throws MessagingException {
// Check if send / sendSync is legal
if (sync) {
Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/MEPExchangeTest.java (698 => 699)
--- trunk/core/src/test/java/org/servicemix/jbi/messaging/MEPExchangeTest.java 2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/MEPExchangeTest.java 2005-10-28 14:47:31 UTC (rev 699)
@@ -19,6 +19,7 @@
import org.servicemix.JbiConstants;
import org.servicemix.components.util.ComponentSupport;
+import org.servicemix.jbi.ExchangeTimeoutException;
import org.servicemix.jbi.container.JBIContainer;
import org.servicemix.jbi.jaxp.StringSource;
@@ -148,13 +149,66 @@
NormalizedMessage m = mec.createMessage();
m.setContent(new StringSource(PAYLOAD));
mec.setInMessage(m);
- consumer.getChannel().sendSync(mec, 10000L);
+ boolean result = consumer.getChannel().sendSync(mec, 10000L);
+ assertTrue(result);
assertEquals(ExchangeStatus.DONE, mec.getStatus());
// Nothing left
assertNull(consumer.getChannel().accept(100L)); // receive in
assertNull(provider.getChannel().accept(100L)); // receive in
}
+ public void testInOnlySyncWithTimeoutBeforeAccept() throws Exception {
+ // Send message exchange
+ MessageExchangeFactory mef = consumer.getChannel().createExchangeFactoryForService(new QName("provider"));
+ InOnly mec = mef.createInOnlyExchange();
+ NormalizedMessage m = mec.createMessage();
+ m.setContent(new StringSource(PAYLOAD));
+ mec.setInMessage(m);
+ boolean result = consumer.getChannel().sendSync(mec, 100L);
+ assertFalse(result);
+ assertEquals(ExchangeStatus.ERROR, mec.getStatus());
+ // Nothing left
+ assertNull(consumer.getChannel().accept(100L)); // receive in
+ assertNull(provider.getChannel().accept(100L)); // receive in
+ }
+
+ public void testInOnlySyncWithTimeoutAfterAccept() throws Exception {
+ // Create thread to answer
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ // Provider side
+ InOnly mep = (InOnly) provider.getChannel().accept(10000L);
+ assertNotNull(mep);
+ assertEquals(ExchangeStatus.ACTIVE, mep.getStatus());
+ assertEquals(Boolean.TRUE, mep.getProperty(JbiConstants.SEND_SYNC));
+ Thread.sleep(100L);
+ mep.setStatus(ExchangeStatus.DONE);
+ provider.getChannel().send(mep);
+ } catch (ExchangeTimeoutException e) {
+ // ok
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+ });
+ t.start();
+ // Send message exchange
+ MessageExchangeFactory mef = consumer.getChannel().createExchangeFactoryForService(new QName("provider"));
+ InOnly mec = mef.createInOnlyExchange();
+ NormalizedMessage m = mec.createMessage();
+ m.setContent(new StringSource(PAYLOAD));
+ mec.setInMessage(m);
+ boolean result = consumer.getChannel().sendSync(mec, 50L);
+
+ assertFalse(result);
+ assertEquals(ExchangeStatus.ERROR, mec.getStatus());
+ // Nothing left
+ assertNull(consumer.getChannel().accept(100L)); // receive in
+ t.join();
+ }
+
public void testInOut() throws Exception {
// Send message exchange
MessageExchangeFactory mef = consumer.getChannel().createExchangeFactoryForService(new QName("provider"));