Title: [699] trunk/core/src/main/java/org/servicemix/jbi: SM-124 : handle sendSync with timeout

Diff

Added: trunk/core/src/main/java/org/servicemix/jbi/ExchangeTimeoutException.java (698 => 699)

--- trunk/core/src/main/java/org/servicemix/jbi/ExchangeTimeoutException.java	2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/main/java/org/servicemix/jbi/ExchangeTimeoutException.java	2005-10-28 14:47:31 UTC (rev 699)
@@ -0,0 +1,43 @@
+/** 
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
+
+/**
+ * An exception thrown when a synchronous exchange has timed out.
+ *
+ * @version $Revision: 657 $
+ */
+public class ExchangeTimeoutException extends MessagingException {
+    private MessageExchange exchange;
+
+    public ExchangeTimeoutException(MessageExchange exchange) {
+        super("Exchange has timed out: " + exchange);
+        this.exchange = exchange;
+    }
+
+    /**
+     * Returns th exchange
+     */
+    public MessageExchange getExchange() {
+        return exchange;
+    }
+}

Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java (698 => 699)

--- trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java	2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java	2005-10-28 14:47:31 UTC (rev 699)
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.servicemix.JbiConstants;
 import org.servicemix.MessageExchangeListener;
+import org.servicemix.jbi.ExchangeTimeoutException;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.framework.ComponentConnector;
@@ -235,8 +236,14 @@
         try {
         	MessageExchangeImpl me = (MessageExchangeImpl) queue.poll(timeoutMS);
         	if (me != null) {
-        		resumeTx(me);
-                me.handleAccept();
+                // If the exchange has already timed out,
+                // do not give it to the component
+                if (me.getPacket().isTimedOut()) {
+                    me = null;
+                } else {
+            		resumeTx(me);
+                    me.handleAccept();
+                }
         	}
         	return me;
         }
@@ -246,6 +253,10 @@
     }
 
     protected void doSend(MessageExchangeImpl messageExchange, boolean sync) throws MessagingException {
+        // If the message has timed out
+        if (messageExchange.getPacket().isTimedOut()) {
+            throw new ExchangeTimeoutException(messageExchange);
+        }
         // Auto enlist exchange in transaction
         autoEnlistInTx(messageExchange);
         // Update persistence info
@@ -304,6 +315,7 @@
         if (this.closed.get()) {
             throw new MessagingException("DeliveryChannel is closed");
         }
+        messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
     	MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
    		doSend(messageExchangeImpl, false);
     }
@@ -349,8 +361,7 @@
                 return true;
             } else {
                 // JBI 5.5.2.1.3: the exchange should be set to ERROR status
-                // TODO: messageExchangeImpl.handleAccept(); ?
-                messageExchangeImpl.setStatus(ExchangeStatus.ERROR);
+                messageExchangeImpl.getPacket().setTimedOut(true);
                 return false;
             }
         } catch (InterruptedException e) {

Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java (698 => 699)

--- trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java	2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java	2005-10-28 14:47:31 UTC (rev 699)
@@ -62,6 +62,7 @@
     private transient Transaction transactionContext;
     private transient String endpointName;
     private Boolean persistent;
+    private boolean timedOut;
 
     
     public ExchangePacket() {
@@ -411,4 +412,12 @@
 		this.persistent = persistent;
 	}
 
+    public boolean isTimedOut() {
+        return timedOut;
+    }
+
+    public void setTimedOut(boolean timedOut) {
+        this.timedOut = timedOut;
+    }
+
 }
\ No newline at end of file

Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java (698 => 699)

--- trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java	2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java	2005-10-28 14:47:31 UTC (rev 699)
@@ -160,6 +160,9 @@
      * @return the processing status of the exchange
      */
     public ExchangeStatus getStatus() {
+        if (this.packet.isTimedOut()) {
+            return ExchangeStatus.ERROR;
+        }
         return this.packet.getStatus();
     }
 
@@ -517,7 +520,7 @@
         out.write(mirror.state);
         out.writeBoolean(can(CAN_PROVIDER));
     }
-
+    
     public void handleSend(boolean sync) throws MessagingException {
         // Check if send / sendSync is legal
         if (sync) {

Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/MEPExchangeTest.java (698 => 699)

--- trunk/core/src/test/java/org/servicemix/jbi/messaging/MEPExchangeTest.java	2005-10-28 14:08:47 UTC (rev 698)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/MEPExchangeTest.java	2005-10-28 14:47:31 UTC (rev 699)
@@ -19,6 +19,7 @@
 
 import org.servicemix.JbiConstants;
 import org.servicemix.components.util.ComponentSupport;
+import org.servicemix.jbi.ExchangeTimeoutException;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.jaxp.StringSource;
 
@@ -148,13 +149,66 @@
 		NormalizedMessage m = mec.createMessage();
 		m.setContent(new StringSource(PAYLOAD));
 		mec.setInMessage(m);
-		consumer.getChannel().sendSync(mec, 10000L);
+		boolean result = consumer.getChannel().sendSync(mec, 10000L);
+        assertTrue(result);
 		assertEquals(ExchangeStatus.DONE, mec.getStatus());
 		// Nothing left
 		assertNull(consumer.getChannel().accept(100L)); // receive in
 		assertNull(provider.getChannel().accept(100L)); // receive in
 	}
 	
+    public void testInOnlySyncWithTimeoutBeforeAccept() throws Exception {
+        // Send message exchange
+        MessageExchangeFactory mef = consumer.getChannel().createExchangeFactoryForService(new QName("provider"));
+        InOnly mec = mef.createInOnlyExchange();
+        NormalizedMessage m = mec.createMessage();
+        m.setContent(new StringSource(PAYLOAD));
+        mec.setInMessage(m);
+        boolean result = consumer.getChannel().sendSync(mec, 100L);
+        assertFalse(result);
+        assertEquals(ExchangeStatus.ERROR, mec.getStatus());
+        // Nothing left
+        assertNull(consumer.getChannel().accept(100L)); // receive in
+        assertNull(provider.getChannel().accept(100L)); // receive in
+    }
+    
+    public void testInOnlySyncWithTimeoutAfterAccept() throws Exception {
+        // Create thread to answer
+        Thread t = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    // Provider side
+                    InOnly mep = (InOnly) provider.getChannel().accept(10000L);
+                    assertNotNull(mep);
+                    assertEquals(ExchangeStatus.ACTIVE, mep.getStatus());
+                    assertEquals(Boolean.TRUE, mep.getProperty(JbiConstants.SEND_SYNC));
+                    Thread.sleep(100L);
+                    mep.setStatus(ExchangeStatus.DONE);
+                    provider.getChannel().send(mep);
+                } catch (ExchangeTimeoutException e) {
+                    // ok
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    fail();
+                }
+            }
+        });
+        t.start();
+        // Send message exchange
+        MessageExchangeFactory mef = consumer.getChannel().createExchangeFactoryForService(new QName("provider"));
+        InOnly mec = mef.createInOnlyExchange();
+        NormalizedMessage m = mec.createMessage();
+        m.setContent(new StringSource(PAYLOAD));
+        mec.setInMessage(m);
+        boolean result = consumer.getChannel().sendSync(mec, 50L);
+        
+        assertFalse(result);
+        assertEquals(ExchangeStatus.ERROR, mec.getStatus());
+        // Nothing left
+        assertNull(consumer.getChannel().accept(100L)); // receive in
+        t.join();
+    }
+    
 	public void testInOut() throws Exception {
 		// Send message exchange
 		MessageExchangeFactory mef = consumer.getChannel().createExchangeFactoryForService(new QName("provider"));

Reply via email to