| Commit in servicemix/base/src/main/java/org/servicemix on MAIN | |||
| jbi/messaging/ExchangePacket.java | +20 | -1 | 1.8 -> 1.9 |
| /MessageExchangeImpl.java | +18 | -1 | 1.14 -> 1.15 |
| /DeliveryChannelImpl.java | +35 | -17 | 1.21 -> 1.22 |
| ExchangeResponseListener.java | +41 | added 1.1 | |
| MessageExchangeListener.java | +19 | -19 | 1.1 -> 1.2 |
| +133 | -38 | ||
Added support for ExchangeResponseListener so can asynchronously get responses for an exchange wih a consumer - instead of using sendSync() on the DeliveryChannel
servicemix/base/src/main/java/org/servicemix/jbi/messaging
diff -u -r1.8 -r1.9 --- ExchangePacket.java 20 Jul 2005 13:01:05 -0000 1.8 +++ ExchangePacket.java 14 Aug 2005 11:53:13 -0000 1.9 @@ -38,7 +38,7 @@
/** * ExchangePacket is responsible for carrying MessageExchange payloads *
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class ExchangePacket implements Externalizable {
private static final long serialVersionUID = -9110837382914609624L;
@@ -70,6 +70,7 @@
private Map messages = new ConcurrentHashMap();
private ServiceEndpointImpl endpoint;
private boolean outbound;
+ private boolean responseExpected;
private transient Transaction transactionContext;
private transient String endpointName;
@@ -86,6 +87,21 @@
public void setOutbound(boolean outbound) {
this.outbound = outbound;
}
+
+
+ /**
+ * @return Returns the responseExpected.
+ */
+ public boolean isResponseExpected() {
+ return responseExpected;
+ }
+
+ /**
+ * @param responseExpected The responseExpected to set.
+ */
+ public void setResponseExpected(boolean responseExpected) {
+ this.responseExpected = responseExpected;
+ }
/**
* @return Returns the endpoint.
@@ -419,6 +435,7 @@
*/
public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(outbound);
+ out.writeBoolean(responseExpected);
out.writeByte(role);
out.writeObject(pattern);
out.writeUTF(exchangeId != null ? exchangeId : "");
@@ -443,6 +460,7 @@
*/
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
outbound = in.readBoolean();
+ responseExpected = in.readBoolean();
role = in.readByte();
pattern = (URI) in.readObject();
exchangeId = in.readUTF();
@@ -460,4 +478,5 @@
inMessage = (NormalizedMessage) messages.get(IN);
outMessage = (NormalizedMessage) messages.get(OUT);
}
+
}
servicemix/base/src/main/java/org/servicemix/jbi/messaging
diff -u -r1.14 -r1.15 --- MessageExchangeImpl.java 2 Aug 2005 16:29:59 -0000 1.14 +++ MessageExchangeImpl.java 14 Aug 2005 11:53:13 -0000 1.15 @@ -41,10 +41,12 @@
* A simple message exchange declaration. This is partial, just giving us enough ME function for the doodle. This * doesn't add anything new to the current MessageExchange definition. *
- * @version $Revision: 1.14 $
+ * @version $Revision: 1.15 $
*/
public class MessageExchangeImpl implements MessageExchange, Serializable {
+ private static final long serialVersionUID = -3639175136897005605L; +
private ComponentContextImpl sourceContext;
private ExchangeStatus status = ExchangeStatus.ACTIVE;
private Role role = Role.PROVIDER;
@@ -125,6 +127,21 @@
*/
public boolean isOutbound(){
return packet.isOutbound();
+ }
+
+
+ /**
+ * @return Returns the responseExpected.
+ */
+ public boolean isResponseExpected() {
+ return packet.isResponseExpected();
+ }
+
+ /**
+ * @param responseExpected The responseExpected to set.
+ */
+ public void setResponseExpected(boolean responseExpected) {
+ packet.setResponseExpected(responseExpected);
}
servicemix/base/src/main/java/org/servicemix/jbi/messaging
diff -u -r1.21 -r1.22 --- DeliveryChannelImpl.java 5 Aug 2005 09:52:08 -0000 1.21 +++ DeliveryChannelImpl.java 14 Aug 2005 11:53:13 -0000 1.22 @@ -29,6 +29,7 @@
import org.activemq.util.IdGenerator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
+import org.servicemix.ExchangeResponseListener;
import org.servicemix.MessageExchangeListener; import org.servicemix.jbi.container.ActivationSpec; import org.servicemix.jbi.container.JBIContainer;
@@ -41,7 +42,7 @@
/** * DeliveryChannel implementation *
- * @version $Revision: 1.21 $
+ * @version $Revision: 1.22 $
*/
public class DeliveryChannelImpl implements DeliveryChannel {
private static final Log log = LogFactory.getLog(DeliveryChannel.class);
@@ -237,26 +238,36 @@
*/
public void send(MessageExchange messageExchange) throws MessagingException {
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
- if (messageExchangeImpl.isOutbound()) {
- if (exchangeThrottling) {
- if (throttlingInterval > intervalCount) {
- intervalCount = 0;
- try {
- Thread.sleep(throttlingTimeout);
+ boolean outbound = messageExchangeImpl.isOutbound();
+ boolean responseExpected = messageExchangeImpl.isResponseExpected();
+ if (outbound || responseExpected) {
+ if (outbound) {
+ if (exchangeThrottling) {
+ if (throttlingInterval > intervalCount) {
+ intervalCount = 0;
+ try {
+ Thread.sleep(throttlingTimeout);
+ }
+ catch (InterruptedException e) {
+ log.warn("throttling failed", e);
+ }
}
- catch (InterruptedException e) {
- log.warn("throttling failed", e);
+ intervalCount++;
+ }
+ if (!messageExchangeImpl.isResponseExpected()) {
+ Component component = ((LocalComponentConnector) componentConnector).getComponent();
+ if (component != null && component instanceof ExchangeResponseListener) {
+ messageExchangeImpl.setResponseExpected(true);
}
}
- intervalCount++;
+ long currentTime = System.currentTimeMillis(); + messagingStats.getOutboundExchanges().increment(); + messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime); + lastSendTime = currentTime; + messageExchangeImpl.setSourceId(componentConnector.getComponentNameSpace());
}
- long currentTime = System.currentTimeMillis(); - messagingStats.getOutboundExchanges().increment(); - messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime); - lastSendTime = currentTime; - messageExchangeImpl.setSourceId(componentConnector.getComponentNameSpace());
+ container.sendExchange(messageExchangeImpl);
}
- container.sendExchange(messageExchangeImpl);
}
/**
@@ -270,6 +281,7 @@
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
AckHelper ack = new AckHelper(messageExchangeImpl);
acks.put(messageExchange.getExchangeId(), ack);
+ messageExchangeImpl.setResponseExpected(true);
send(messageExchange);
return ack.isAcked();
}
@@ -286,6 +298,7 @@
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
AckHelper ack = new AckHelper(messageExchangeImpl);
acks.put(messageExchange.getExchangeId(), ack);
+ messageExchangeImpl.setResponseExpected(true);
send(messageExchange);
return ack.isAcked(timeoutMS);
}
@@ -416,7 +429,6 @@
messagingStats.getInboundExchangeRate().addTime(currentTime - lastReceiveTime);
lastReceiveTime = currentTime;
MessageExchangeImpl me = createInboundExchange(packet);
- me.setPacket(packet);
Component component = ((LocalComponentConnector) componentConnector).getComponent();
if (component != null && component instanceof MessageExchangeListener) {
((MessageExchangeListener) component).onMessageExchange(me);
@@ -435,6 +447,12 @@
if (ack != null) {
ack.getMessageExchange().setPacket(packet);
ack.done();
+ }else {
+ Component component = ((LocalComponentConnector) componentConnector).getComponent();
+ if (component != null && component instanceof ExchangeResponseListener) {
+ MessageExchangeImpl me = createInboundExchange(packet);
+ ((ExchangeResponseListener) component).onMessageExchangeResponse(me);
+ }
}
}
}
servicemix/base/src/main/java/org/servicemix
ExchangeResponseListener.java added at 1.1
diff -N ExchangeResponseListener.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ ExchangeResponseListener.java 14 Aug 2005 11:53:13 -0000 1.1 @@ -0,0 +1,41 @@
+/**
+ * <a href="">ServiceMix: The open source ESB</a>
+ *
+ * Copyright 2005 RAJD Consultancy Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.servicemix;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+/**
+ * If a Component implements this interface, a consumed MessageExchange response will be
+ * delivered directly to the listener asynchronously rather than waiting and blocking on
+ * a sendSync() call
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface ExchangeResponseListener {
+
+ /**
+ * MessageExchange passed directly to the listener instead of being queued
+ *
+ * @param exchange
+ * @throws MessagingException
+ */
+ public void onMessageExchangeResponse(MessageExchange exchange) throws MessagingException;
+}
servicemix/base/src/main/java/org/servicemix
diff -u -r1.1 -r1.2 --- MessageExchangeListener.java 22 Jun 2005 16:52:19 -0000 1.1 +++ MessageExchangeListener.java 14 Aug 2005 11:53:13 -0000 1.2 @@ -1,21 +1,21 @@
-/* - * Copyright 2005 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License") - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - *
+/** + * <a href="">ServiceMix: The open source ESB</a> + * + * Copyright 2005 RAJD Consultancy Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Copyright 2005 RAJD Consultancy Ltd. All rights reserved. - */
+ * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/
package org.servicemix;
@@ -24,10 +24,10 @@
/** * If a Component implements this interface, MessageExchange will be delivered directly to the listener
- * synchronously rather than using the usual asynchronous delivery with a thread used up per consuming
+ * asynchronously rather than using the usual asynchronous delivery with a thread used up per consuming
* component. *
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface MessageExchangeListener {
