Commit in servicemix/base/src/main/java/org/servicemix on MAIN
jbi/messaging/ExchangePacket.java+20-11.8 -> 1.9
             /MessageExchangeImpl.java+18-11.14 -> 1.15
             /DeliveryChannelImpl.java+35-171.21 -> 1.22
ExchangeResponseListener.java+41added 1.1
MessageExchangeListener.java+19-191.1 -> 1.2
+133-38
1 added + 4 modified, total 5 files
Added support for ExchangeResponseListener so can asynchronously get
responses for an exchange wih a consumer - instead of using sendSync()
on the DeliveryChannel

servicemix/base/src/main/java/org/servicemix/jbi/messaging
ExchangePacket.java 1.8 -> 1.9
diff -u -r1.8 -r1.9
--- ExchangePacket.java	20 Jul 2005 13:01:05 -0000	1.8
+++ ExchangePacket.java	14 Aug 2005 11:53:13 -0000	1.9
@@ -38,7 +38,7 @@
 /**
  * ExchangePacket is responsible for carrying MessageExchange payloads
  * 
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
  */
 public class ExchangePacket implements Externalizable {
     private static final long serialVersionUID = -9110837382914609624L;
@@ -70,6 +70,7 @@
     private Map messages = new ConcurrentHashMap();
     private ServiceEndpointImpl endpoint;
     private boolean outbound;
+    private boolean responseExpected;
     private transient Transaction transactionContext;
     private transient String endpointName;
 
@@ -86,6 +87,21 @@
     public void setOutbound(boolean outbound) {
         this.outbound = outbound;
     }
+    
+
+    /**
+     * @return Returns the responseExpected.
+     */
+    public boolean isResponseExpected() {
+        return responseExpected;
+    }
+
+    /**
+     * @param responseExpected The responseExpected to set.
+     */
+    public void setResponseExpected(boolean responseExpected) {
+        this.responseExpected = responseExpected;
+    }
 
     /**
      * @return Returns the endpoint.
@@ -419,6 +435,7 @@
      */
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeBoolean(outbound);
+        out.writeBoolean(responseExpected);
         out.writeByte(role);
         out.writeObject(pattern);
         out.writeUTF(exchangeId != null ? exchangeId : "");
@@ -443,6 +460,7 @@
      */
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         outbound = in.readBoolean();
+        responseExpected = in.readBoolean();
         role = in.readByte();
         pattern = (URI) in.readObject();
         exchangeId = in.readUTF();
@@ -460,4 +478,5 @@
         inMessage = (NormalizedMessage) messages.get(IN);
         outMessage = (NormalizedMessage) messages.get(OUT);
     }
+
 }

servicemix/base/src/main/java/org/servicemix/jbi/messaging
MessageExchangeImpl.java 1.14 -> 1.15
diff -u -r1.14 -r1.15
--- MessageExchangeImpl.java	2 Aug 2005 16:29:59 -0000	1.14
+++ MessageExchangeImpl.java	14 Aug 2005 11:53:13 -0000	1.15
@@ -41,10 +41,12 @@
  * A simple message exchange declaration. This is partial, just giving us enough ME function for the doodle. This
  * doesn't add anything new to the current MessageExchange definition.
  *
- * @version $Revision: 1.14 $
+ * @version $Revision: 1.15 $
  */
 public class MessageExchangeImpl implements MessageExchange, Serializable {
 
+    private static final long serialVersionUID = -3639175136897005605L;
+    
     private ComponentContextImpl sourceContext;
     private ExchangeStatus status = ExchangeStatus.ACTIVE;
     private Role role = Role.PROVIDER;
@@ -125,6 +127,21 @@
      */
     public boolean isOutbound(){
         return packet.isOutbound();
+    }
+    
+
+    /**
+     * @return Returns the responseExpected.
+     */
+    public boolean isResponseExpected() {
+        return packet.isResponseExpected();
+    }
+
+    /**
+     * @param responseExpected The responseExpected to set.
+     */
+    public void setResponseExpected(boolean responseExpected) {
+        packet.setResponseExpected(responseExpected);
     }
     
 

servicemix/base/src/main/java/org/servicemix/jbi/messaging
DeliveryChannelImpl.java 1.21 -> 1.22
diff -u -r1.21 -r1.22
--- DeliveryChannelImpl.java	5 Aug 2005 09:52:08 -0000	1.21
+++ DeliveryChannelImpl.java	14 Aug 2005 11:53:13 -0000	1.22
@@ -29,6 +29,7 @@
 import org.activemq.util.IdGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.servicemix.ExchangeResponseListener;
 import org.servicemix.MessageExchangeListener;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
@@ -41,7 +42,7 @@
 /**
  * DeliveryChannel implementation
  * 
- * @version $Revision: 1.21 $
+ * @version $Revision: 1.22 $
  */
 public class DeliveryChannelImpl implements DeliveryChannel {
     private static final Log log = LogFactory.getLog(DeliveryChannel.class);
@@ -237,26 +238,36 @@
      */
     public void send(MessageExchange messageExchange) throws MessagingException {
         MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
-        if (messageExchangeImpl.isOutbound()) {
-            if (exchangeThrottling) {
-                if (throttlingInterval > intervalCount) {
-                    intervalCount = 0;
-                    try {
-                        Thread.sleep(throttlingTimeout);
+        boolean outbound = messageExchangeImpl.isOutbound();
+        boolean responseExpected = messageExchangeImpl.isResponseExpected();
+        if (outbound || responseExpected) {
+            if (outbound) {
+                if (exchangeThrottling) {
+                    if (throttlingInterval > intervalCount) {
+                        intervalCount = 0;
+                        try {
+                            Thread.sleep(throttlingTimeout);
+                        }
+                        catch (InterruptedException e) {
+                            log.warn("throttling failed", e);
+                        }
                     }
-                    catch (InterruptedException e) {
-                        log.warn("throttling failed", e);
+                    intervalCount++;
+                }
+                if (!messageExchangeImpl.isResponseExpected()) {
+                    Component component = ((LocalComponentConnector) componentConnector).getComponent();
+                    if (component != null && component instanceof ExchangeResponseListener) {
+                        messageExchangeImpl.setResponseExpected(true);
                     }
                 }
-                intervalCount++;
+                long currentTime = System.currentTimeMillis();
+                messagingStats.getOutboundExchanges().increment();
+                messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
+                lastSendTime = currentTime;
+                messageExchangeImpl.setSourceId(componentConnector.getComponentNameSpace());
             }
-            long currentTime = System.currentTimeMillis();
-            messagingStats.getOutboundExchanges().increment();
-            messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
-            lastSendTime = currentTime;
-            messageExchangeImpl.setSourceId(componentConnector.getComponentNameSpace());
+            container.sendExchange(messageExchangeImpl);
         }
-        container.sendExchange(messageExchangeImpl);
     }
 
     /**
@@ -270,6 +281,7 @@
         MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
         AckHelper ack = new AckHelper(messageExchangeImpl);
         acks.put(messageExchange.getExchangeId(), ack);
+        messageExchangeImpl.setResponseExpected(true);
         send(messageExchange);
         return ack.isAcked();
     }
@@ -286,6 +298,7 @@
         MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
         AckHelper ack = new AckHelper(messageExchangeImpl);
         acks.put(messageExchange.getExchangeId(), ack);
+        messageExchangeImpl.setResponseExpected(true);
         send(messageExchange);
         return ack.isAcked(timeoutMS);
     }
@@ -416,7 +429,6 @@
                 messagingStats.getInboundExchangeRate().addTime(currentTime - lastReceiveTime);
                 lastReceiveTime = currentTime;
                 MessageExchangeImpl me = createInboundExchange(packet);
-                me.setPacket(packet);
                 Component component = ((LocalComponentConnector) componentConnector).getComponent();
                 if (component != null && component instanceof MessageExchangeListener) {
                     ((MessageExchangeListener) component).onMessageExchange(me);
@@ -435,6 +447,12 @@
                 if (ack != null) {
                     ack.getMessageExchange().setPacket(packet);
                     ack.done();
+                }else {
+                    Component component = ((LocalComponentConnector) componentConnector).getComponent();
+                    if (component != null && component instanceof ExchangeResponseListener) {
+                        MessageExchangeImpl me = createInboundExchange(packet);
+                        ((ExchangeResponseListener) component).onMessageExchangeResponse(me);
+                    }
                 }
             }
         }

servicemix/base/src/main/java/org/servicemix
ExchangeResponseListener.java added at 1.1
diff -N ExchangeResponseListener.java
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ ExchangeResponseListener.java	14 Aug 2005 11:53:13 -0000	1.1
@@ -0,0 +1,41 @@
+/** 
+ * <a href="">ServiceMix: The open source ESB</a> 
+ * 
+ * Copyright 2005 RAJD Consultancy Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
+package org.servicemix;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+/**
+ * If a Component implements this interface,  a consumed MessageExchange response will be 
+ * delivered directly to the listener asynchronously rather than waiting and blocking on 
+ * a sendSync() call
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface ExchangeResponseListener {
+
+    /**
+     * MessageExchange passed directly to the listener instead of being queued
+     *
+     * @param exchange
+     * @throws MessagingException
+     */
+    public void onMessageExchangeResponse(MessageExchange exchange) throws MessagingException;
+}

servicemix/base/src/main/java/org/servicemix
MessageExchangeListener.java 1.1 -> 1.2
diff -u -r1.1 -r1.2
--- MessageExchangeListener.java	22 Jun 2005 16:52:19 -0000	1.1
+++ MessageExchangeListener.java	14 Aug 2005 11:53:13 -0000	1.2
@@ -1,21 +1,21 @@
-/*
- * Copyright 2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License")
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+/** 
+ * <a href="">ServiceMix: The open source ESB</a> 
+ * 
+ * Copyright 2005 RAJD Consultancy Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Copyright 2005 RAJD Consultancy Ltd. All rights reserved.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 
 package org.servicemix;
 
@@ -24,10 +24,10 @@
 
 /**
  * If a Component implements this interface, MessageExchange will be delivered directly to the listener
- * synchronously rather than using the usual asynchronous delivery with a thread used up per consuming
+ * asynchronously rather than using the usual asynchronous delivery with a thread used up per consuming
  * component.
  *
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
  */
 public interface MessageExchangeListener {
 
CVSspam 0.2.8



Reply via email to