| Commit in servicemix/base/src/main/java/org/servicemix/jbi on MAIN | |||
| nmr/SubscriptionRegistry.java | +62 | added 1.1 | |
| /SubscriptionManager.java | +65 | added 1.1 | |
| /Broker.java | +25 | -3 | 1.19 -> 1.20 |
| messaging/ExchangePacket.java | +35 | -1 | 1.9 -> 1.10 |
| config/ComponentElementProcessor.java | +8 | -1 | 1.13 -> 1.14 |
| container/ActivationSpec.java | +17 | -1 | 1.11 -> 1.12 |
| /SubscriptionSpec.java | +11 | 1.1 -> 1.2 | |
| +223 | -6 | ||
added in the wiring to support topic based component subscriptions
servicemix/base/src/main/java/org/servicemix/jbi/nmr
SubscriptionRegistry.java added at 1.1
diff -N SubscriptionRegistry.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ SubscriptionRegistry.java 20 Sep 2005 16:52:24 -0000 1.1 @@ -0,0 +1,62 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.nmr;
+
+import org.servicemix.jbi.container.SubscriptionSpec;
+import org.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.servicemix.jbi.nmr.flow.Flow;
+import org.servicemix.jbi.servicedesc.ServiceEndpointImpl;
+
+import javax.jbi.JBIException;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Maintains a registry of the applicable subscriptions currently active for the current components
+ *
+ * @version $Revision$
+ */
+public class SubscriptionRegistry {
+
+ private Map subscriptions = new ConcurrentHashMap();
+
+ public void dispatchToSubscribers(MessageExchangeImpl exchange, SubscriptionManager subscriptionManager) throws JBIException {
+ for (Iterator iter = subscriptions.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Map.Entry) iter.next();
+
+ SubscriptionSpec subscription = (SubscriptionSpec) entry.getKey();
+
+ if (subscription.matches(exchange)) {
+ ServiceEndpointImpl endpoint = (ServiceEndpointImpl) entry.getValue();;
+ subscriptionManager.dispatchToSubscriber(exchange, endpoint);
+ }
+ }
+ }
+
+ public void registerSubscription(SubscriptionSpec subscription, ServiceEndpointImpl endpoint) {
+ subscriptions.put(subscription, endpoint);
+ }
+
+ public ServiceEndpointImpl deregisterSubscription(SubscriptionSpec subscription) {
+ return (ServiceEndpointImpl) subscriptions.remove(subscription);
+ }
+
+}
servicemix/base/src/main/java/org/servicemix/jbi/nmr
SubscriptionManager.java added at 1.1
diff -N SubscriptionManager.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ SubscriptionManager.java 20 Sep 2005 16:52:24 -0000 1.1 @@ -0,0 +1,65 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.nmr;
+
+import org.servicemix.jbi.messaging.ExchangePacket;
+import org.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.servicemix.jbi.nmr.flow.Flow;
+import org.servicemix.jbi.nmr.flow.FlowProvider;
+import org.servicemix.jbi.servicedesc.ServiceEndpointImpl;
+
+import javax.jbi.JBIException;
+
+/**
+ * Handles publish/subscribe style messaging in the NMR.
+ *
+ *
+ * @version $Revision$
+ */
+public class SubscriptionManager {
+
+ private SubscriptionRegistry subscriptionRegistry = new SubscriptionRegistry();
+ private String flowName = "seda";
+ private Flow flow;
+
+ public void init(Broker broker) throws JBIException {
+ if (this.flow == null) {
+ this.flow = FlowProvider.getFlow(flowName);
+ }
+ this.flow.init(broker);
+ }
+
+ /**
+ * Dispatches the given exchange to all matching subscribers
+ *
+ * @throws JBIException
+ */
+ public void dispatchToSubscribers(MessageExchangeImpl exchange) throws JBIException {
+ subscriptionRegistry.dispatchToSubscribers(exchange, this);
+ }
+
+ /**
+ * Dispatches the given message exchange to the given endpoint
+ */
+ public void dispatchToSubscriber(MessageExchangeImpl exchange, ServiceEndpointImpl endpoint) throws JBIException {
+ ExchangePacket packet = exchange.getPacket().copy();
+ packet.setEndpoint(endpoint);
+ flow.send(packet);
+ }
+
+}
servicemix/base/src/main/java/org/servicemix/jbi/nmr
diff -u -r1.19 -r1.20 --- Broker.java 22 Aug 2005 09:25:43 -0000 1.19 +++ Broker.java 20 Sep 2005 16:52:24 -0000 1.20 @@ -55,7 +55,7 @@
/** * The Broker handles Nomalised Message Routing within ServiceMix *
- * @version $Revision: 1.19 $
+ * @version $Revision: 1.20 $
*/
public class Broker extends BaseLifeCycle {
private JBIContainer container;
@@ -66,6 +66,7 @@
private final static Log log = LogFactory.getLog(Broker.class);
private EndpointChooser defaultServiceChooser = new FirstChoicePolicy();
private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy();
+ private SubscriptionManager subscriptionManager = new SubscriptionManager();
/**
* Constructor
@@ -95,6 +96,17 @@
this.workManager = workManager;
}
+ public SubscriptionManager getSubscriptionManager() {
+ return subscriptionManager;
+ }
+
+ /**
+ * Sets the subscription manager
+ */
+ public void setSubscriptionManager(SubscriptionManager subscriptionManager) {
+ this.subscriptionManager = subscriptionManager;
+ }
+
/**
* initialize the broker
* @param container
@@ -108,6 +120,7 @@
this.flow = FlowProvider.getFlow(flowName);
}
this.flow.init(this);
+ getSubscriptionManager().init(this);
}
/**
@@ -301,8 +314,15 @@
packet.setDestinationId(destinationId);
}
else {
- throw new MessagingException("Could not find route for exchange: " + exchange
- + " for service: " + serviceName + " and interface: " + interfaceName);
+ boolean throwException = true;
+ ActivationSpec activationSpec = exchange.getActivationSpec();
+ if (activationSpec != null) {
+ throwException = activationSpec.isFailIfNoDestinationEndpoint();
+ }
+ if (throwException) {
+ throw new MessagingException("Could not find route for exchange: " + exchange
+ + " for service: " + serviceName + " and interface: " + interfaceName);
+ }
}
if (log.isTraceEnabled()){
log.trace("routing exchange " + packet + " TO: " + theEndpoint);
@@ -310,6 +330,8 @@
}
flow.send(packet);
+ + getSubscriptionManager().dispatchToSubscribers(exchange);
}
/**
servicemix/base/src/main/java/org/servicemix/jbi/messaging
diff -u -r1.9 -r1.10 --- ExchangePacket.java 14 Aug 2005 11:53:13 -0000 1.9 +++ ExchangePacket.java 20 Sep 2005 16:52:24 -0000 1.10 @@ -38,7 +38,7 @@
/** * ExchangePacket is responsible for carrying MessageExchange payloads *
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class ExchangePacket implements Externalizable {
private static final long serialVersionUID = -9110837382914609624L;
@@ -74,6 +74,33 @@
private transient Transaction transactionContext;
private transient String endpointName;
+
+ public ExchangePacket() {
+ }
+
+ public ExchangePacket(ExchangePacket packet) {
+ this.destinationId = packet.destinationId;
+ this.endpoint = null; // packet.endpoint;
+ this.endpointName = packet.endpointName;
+ this.error = null;
+ this.exchangeId = null; //???;
+ this.fault = null;
+ this.inMessage = packet.inMessage;
+ this.interfaceName = packet.interfaceName;
+ this.messages = packet.messages;
+ this.operationName = packet.operationName;
+ this.outbound = packet.outbound;
+ this.outMessage = packet.outMessage;
+ this.pattern = packet.pattern;
+ this.properties = packet.properties;
+ this.responseExpected = packet.responseExpected;
+ this.role = packet.role;
+ this.serviceName = packet.serviceName;
+ this.sourceId = packet.sourceId;
+ this.status = packet.status;
+ this.transactionContext = packet.transactionContext;
+ }
+
/**
* @return Returns the outbound.
*/
@@ -477,6 +504,13 @@
fault = (Fault) messages.get(FAULT);
inMessage = (NormalizedMessage) messages.get(IN);
outMessage = (NormalizedMessage) messages.get(OUT);
+ }
+
+ /**
+ * Creates a copy of the packet so it can be sent to another destination
+ */
+ public ExchangePacket copy() {
+ return new ExchangePacket(this);
} }
servicemix/base/src/main/java/org/servicemix/jbi/config
diff -u -r1.13 -r1.14 --- ComponentElementProcessor.java 20 Sep 2005 10:46:05 -0000 1.13 +++ ComponentElementProcessor.java 20 Sep 2005 16:52:24 -0000 1.14 @@ -33,7 +33,7 @@
/** * Handles the 'component' element *
- * @version $Revision: 1.13 $
+ * @version $Revision: 1.14 $
*/
public class ComponentElementProcessor extends QNameElementProcessor implements ElementProcessor {
@@ -101,6 +101,13 @@
if (destinationOperation != null) {
element.removeAttribute("destinationOperation");
addQNameProperty(registration, "destinationOperation", destinationOperation, element);
+ }
+ String failOnNoEndpoint = element.getAttribute("failIfNoDestinationEndpoint");
+ if (failOnNoEndpoint != null) {
+ element.removeAttribute("failIfNoDestinationEndpoint");
+ if (failOnNoEndpoint.length() > 0) {
+ addPropertyElement(registration, "failIfNoDestinationEndpoint", failOnNoEndpoint);
+ }
}
// lets move any subscriptions into a new property
servicemix/base/src/main/java/org/servicemix/jbi/container
diff -u -r1.11 -r1.12 --- ActivationSpec.java 20 Sep 2005 10:46:05 -0000 1.11 +++ ActivationSpec.java 20 Sep 2005 16:52:24 -0000 1.12 @@ -29,7 +29,7 @@
/** * Represents the registration of a component with the [EMAIL PROTECTED] JBIContainer} *
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public class ActivationSpec {
private String id;
@@ -48,6 +48,7 @@
private String destinationEndpoint;
private Marshaler marshaler;
private SubscriptionSpec[] subscriptions = {};
+ private boolean failIfNoDestinationEndpoint = true;
public ActivationSpec() {
@@ -232,6 +233,21 @@
public void setSubscriptions(SubscriptionSpec[] subscriptions) {
this.subscriptions = subscriptions;
+ }
+
+ public boolean isFailIfNoDestinationEndpoint() {
+ return failIfNoDestinationEndpoint;
+ }
+
+ /**
+ * Sets whether or not there should be a failure if there is no matching endpoint for the service dispatch.
+ * It may be in a pure publish/subscribe model you want all available subscribes to receive the message but do not mind
+ * if there is not a single destination endpoint that can be found.
+ *
+ * @param failIfNoDestinationEndpoint
+ */
+ public void setFailIfNoDestinationEndpoint(boolean failIfNoDestinationEndpoint) {
+ this.failIfNoDestinationEndpoint = failIfNoDestinationEndpoint;
}
/**
servicemix/base/src/main/java/org/servicemix/jbi/container
diff -u -r1.1 -r1.2 --- SubscriptionSpec.java 20 Sep 2005 10:46:05 -0000 1.1 +++ SubscriptionSpec.java 20 Sep 2005 16:52:24 -0000 1.2 @@ -17,6 +17,7 @@
**/ package org.servicemix.jbi.container;
+import org.servicemix.jbi.messaging.MessageExchangeImpl;
import org.servicemix.jbi.resolver.EndpointFilter; import javax.xml.namespace.QName;
@@ -72,5 +73,15 @@
public void setService(QName service) {
this.service = service;
+ }
+
+ /**
+ * Returns true if this subscription matches the given message exchange
+ *
+ * @param exchange the exchange to be matched
+ * @return true if this subscription matches the exchange
+ */
+ public boolean matches(MessageExchangeImpl exchange) {
+ return false;
} }
