Commit in servicemix/base/src/main/java/org/servicemix/jbi on MAIN
nmr/SubscriptionRegistry.java+62added 1.1
   /SubscriptionManager.java+65added 1.1
   /Broker.java+25-31.19 -> 1.20
messaging/ExchangePacket.java+35-11.9 -> 1.10
config/ComponentElementProcessor.java+8-11.13 -> 1.14
container/ActivationSpec.java+17-11.11 -> 1.12
         /SubscriptionSpec.java+111.1 -> 1.2
+223-6
2 added + 5 modified, total 7 files
added in the wiring to support topic based component subscriptions

servicemix/base/src/main/java/org/servicemix/jbi/nmr
SubscriptionRegistry.java added at 1.1
diff -N SubscriptionRegistry.java
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ SubscriptionRegistry.java	20 Sep 2005 16:52:24 -0000	1.1
@@ -0,0 +1,62 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi.nmr;
+
+import org.servicemix.jbi.container.SubscriptionSpec;
+import org.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.servicemix.jbi.nmr.flow.Flow;
+import org.servicemix.jbi.servicedesc.ServiceEndpointImpl;
+
+import javax.jbi.JBIException;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Maintains a registry of the applicable subscriptions currently active for the current components
+ * 
+ * @version $Revision$
+ */
+public class SubscriptionRegistry {
+
+    private Map subscriptions = new ConcurrentHashMap();
+
+    public void dispatchToSubscribers(MessageExchangeImpl exchange, SubscriptionManager subscriptionManager) throws JBIException {
+        for (Iterator iter = subscriptions.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            
+            SubscriptionSpec subscription = (SubscriptionSpec) entry.getKey();
+            
+            if (subscription.matches(exchange)) {
+                ServiceEndpointImpl endpoint = (ServiceEndpointImpl) entry.getValue();;
+                subscriptionManager.dispatchToSubscriber(exchange, endpoint);
+            }
+        }
+    }
+    
+    public void registerSubscription(SubscriptionSpec subscription,                  ServiceEndpointImpl endpoint) {
+        subscriptions.put(subscription, endpoint);
+    }
+    
+    public ServiceEndpointImpl deregisterSubscription(SubscriptionSpec subscription) {
+        return (ServiceEndpointImpl) subscriptions.remove(subscription);
+    }
+    
+}

servicemix/base/src/main/java/org/servicemix/jbi/nmr
SubscriptionManager.java added at 1.1
diff -N SubscriptionManager.java
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ SubscriptionManager.java	20 Sep 2005 16:52:24 -0000	1.1
@@ -0,0 +1,65 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi.nmr;
+
+import org.servicemix.jbi.messaging.ExchangePacket;
+import org.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.servicemix.jbi.nmr.flow.Flow;
+import org.servicemix.jbi.nmr.flow.FlowProvider;
+import org.servicemix.jbi.servicedesc.ServiceEndpointImpl;
+
+import javax.jbi.JBIException;
+
+/**
+ * Handles publish/subscribe style messaging in the NMR.
+ * 
+ * 
+ * @version $Revision$
+ */
+public class SubscriptionManager {
+    
+    private SubscriptionRegistry subscriptionRegistry = new SubscriptionRegistry();
+    private String flowName = "seda";
+    private Flow flow;
+
+    public void init(Broker broker) throws JBIException {
+        if (this.flow == null) {
+            this.flow = FlowProvider.getFlow(flowName);
+        }
+        this.flow.init(broker);
+    }
+
+    /**
+     * Dispatches the given exchange to all matching subscribers
+     * 
+     * @throws JBIException 
+     */
+    public void dispatchToSubscribers(MessageExchangeImpl exchange) throws JBIException {
+        subscriptionRegistry.dispatchToSubscribers(exchange, this);
+    }
+
+    /**
+     * Dispatches the given message exchange to the given endpoint
+     */
+    public void dispatchToSubscriber(MessageExchangeImpl exchange, ServiceEndpointImpl endpoint) throws JBIException {
+        ExchangePacket packet = exchange.getPacket().copy();
+        packet.setEndpoint(endpoint);
+        flow.send(packet);
+    }
+   
+}

servicemix/base/src/main/java/org/servicemix/jbi/nmr
Broker.java 1.19 -> 1.20
diff -u -r1.19 -r1.20
--- Broker.java	22 Aug 2005 09:25:43 -0000	1.19
+++ Broker.java	20 Sep 2005 16:52:24 -0000	1.20
@@ -55,7 +55,7 @@
 /**
  * The Broker handles Nomalised Message Routing within ServiceMix
  *
- * @version $Revision: 1.19 $
+ * @version $Revision: 1.20 $
  */
 public class Broker  extends BaseLifeCycle {
     private JBIContainer container;
@@ -66,6 +66,7 @@
     private final static Log log = LogFactory.getLog(Broker.class);
     private EndpointChooser defaultServiceChooser = new FirstChoicePolicy();
     private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy();
+    private SubscriptionManager subscriptionManager = new SubscriptionManager();
 
     /**
      * Constructor
@@ -95,6 +96,17 @@
         this.workManager = workManager;
     }    
 
+    public SubscriptionManager getSubscriptionManager() {
+        return subscriptionManager;
+    }
+
+    /**
+     * Sets the subscription manager
+     */
+    public void setSubscriptionManager(SubscriptionManager subscriptionManager) {
+        this.subscriptionManager = subscriptionManager;
+    }
+
     /**
      * initialize the broker
      * @param container
@@ -108,6 +120,7 @@
             this.flow = FlowProvider.getFlow(flowName);
         }
         this.flow.init(this);
+        getSubscriptionManager().init(this);
     }
     
     /**
@@ -301,8 +314,15 @@
                 packet.setDestinationId(destinationId);
             }
             else {
-                throw new MessagingException("Could not find route for exchange: " + exchange
-                        + " for service: " + serviceName + " and interface: " + interfaceName);
+                boolean throwException = true;
+                ActivationSpec activationSpec = exchange.getActivationSpec();
+                if (activationSpec != null) {
+                    throwException = activationSpec.isFailIfNoDestinationEndpoint();
+                }
+                if (throwException) {
+                    throw new MessagingException("Could not find route for exchange: " + exchange
+                            + " for service: " + serviceName + " and interface: " + interfaceName);
+                }
             }
            if (log.isTraceEnabled()){
                log.trace("routing exchange " + packet + " TO: " + theEndpoint);
@@ -310,6 +330,8 @@
         }
         
         flow.send(packet);
+        
+        getSubscriptionManager().dispatchToSubscribers(exchange);
     }
 
     /**

servicemix/base/src/main/java/org/servicemix/jbi/messaging
ExchangePacket.java 1.9 -> 1.10
diff -u -r1.9 -r1.10
--- ExchangePacket.java	14 Aug 2005 11:53:13 -0000	1.9
+++ ExchangePacket.java	20 Sep 2005 16:52:24 -0000	1.10
@@ -38,7 +38,7 @@
 /**
  * ExchangePacket is responsible for carrying MessageExchange payloads
  * 
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
  */
 public class ExchangePacket implements Externalizable {
     private static final long serialVersionUID = -9110837382914609624L;
@@ -74,6 +74,33 @@
     private transient Transaction transactionContext;
     private transient String endpointName;
 
+    
+    public ExchangePacket() {
+    }
+
+    public ExchangePacket(ExchangePacket packet) {
+        this.destinationId = packet.destinationId;
+        this.endpoint = null; // packet.endpoint;
+        this.endpointName = packet.endpointName;
+        this.error = null;
+        this.exchangeId = null; //???;
+        this.fault = null;
+        this.inMessage = packet.inMessage;
+        this.interfaceName = packet.interfaceName;
+        this.messages = packet.messages;
+        this.operationName = packet.operationName;
+        this.outbound = packet.outbound;
+        this.outMessage = packet.outMessage;
+        this.pattern = packet.pattern;
+        this.properties = packet.properties;
+        this.responseExpected = packet.responseExpected;
+        this.role = packet.role;
+        this.serviceName = packet.serviceName;
+        this.sourceId = packet.sourceId;
+        this.status = packet.status;
+        this.transactionContext = packet.transactionContext;
+    }
+
     /**
      * @return Returns the outbound.
      */
@@ -477,6 +504,13 @@
         fault = (Fault) messages.get(FAULT);
         inMessage = (NormalizedMessage) messages.get(IN);
         outMessage = (NormalizedMessage) messages.get(OUT);
+    }
+
+    /**
+     * Creates a copy of the packet so it can be sent to another destination
+     */
+    public ExchangePacket copy() {
+        return new ExchangePacket(this);
     }
 
 }

servicemix/base/src/main/java/org/servicemix/jbi/config
ComponentElementProcessor.java 1.13 -> 1.14
diff -u -r1.13 -r1.14
--- ComponentElementProcessor.java	20 Sep 2005 10:46:05 -0000	1.13
+++ ComponentElementProcessor.java	20 Sep 2005 16:52:24 -0000	1.14
@@ -33,7 +33,7 @@
 /**
  * Handles the 'component' element
  * 
- * @version $Revision: 1.13 $
+ * @version $Revision: 1.14 $
  */
 public class ComponentElementProcessor extends QNameElementProcessor implements ElementProcessor {
 
@@ -101,6 +101,13 @@
         if (destinationOperation != null) {
             element.removeAttribute("destinationOperation");
             addQNameProperty(registration, "destinationOperation", destinationOperation, element);
+        }
+        String failOnNoEndpoint = element.getAttribute("failIfNoDestinationEndpoint");
+        if (failOnNoEndpoint != null) {
+            element.removeAttribute("failIfNoDestinationEndpoint");
+            if (failOnNoEndpoint.length() > 0) {
+                addPropertyElement(registration, "failIfNoDestinationEndpoint", failOnNoEndpoint);
+            }
         }
 
         // lets move any subscriptions into a new property

servicemix/base/src/main/java/org/servicemix/jbi/container
ActivationSpec.java 1.11 -> 1.12
diff -u -r1.11 -r1.12
--- ActivationSpec.java	20 Sep 2005 10:46:05 -0000	1.11
+++ ActivationSpec.java	20 Sep 2005 16:52:24 -0000	1.12
@@ -29,7 +29,7 @@
 /**
  * Represents the registration of a component with the [EMAIL PROTECTED] JBIContainer}
  *
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
  */
 public class ActivationSpec {
     private String id;
@@ -48,6 +48,7 @@
     private String destinationEndpoint;
     private Marshaler marshaler;
     private SubscriptionSpec[] subscriptions = {};
+    private boolean failIfNoDestinationEndpoint = true;
 
 
     public ActivationSpec() {
@@ -232,6 +233,21 @@
 
     public void setSubscriptions(SubscriptionSpec[] subscriptions) {
         this.subscriptions = subscriptions;
+    }
+    
+    public boolean isFailIfNoDestinationEndpoint() {
+        return failIfNoDestinationEndpoint;
+    }
+
+    /**
+     * Sets whether or not there should be a failure if there is no matching endpoint for the service dispatch.
+     * It may be in a pure publish/subscribe model you want all available subscribes to receive the message but do not mind
+     * if there is not a single destination endpoint that can be found.
+     * 
+     * @param failIfNoDestinationEndpoint
+     */
+    public void setFailIfNoDestinationEndpoint(boolean failIfNoDestinationEndpoint) {
+        this.failIfNoDestinationEndpoint = failIfNoDestinationEndpoint;
     }
 
     /**

servicemix/base/src/main/java/org/servicemix/jbi/container
SubscriptionSpec.java 1.1 -> 1.2
diff -u -r1.1 -r1.2
--- SubscriptionSpec.java	20 Sep 2005 10:46:05 -0000	1.1
+++ SubscriptionSpec.java	20 Sep 2005 16:52:24 -0000	1.2
@@ -17,6 +17,7 @@
  **/
 package org.servicemix.jbi.container;
 
+import org.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.servicemix.jbi.resolver.EndpointFilter;
 
 import javax.xml.namespace.QName;
@@ -72,5 +73,15 @@
 
     public void setService(QName service) {
         this.service = service;
+    }
+
+    /**
+     * Returns true if this subscription matches the given message exchange
+     * 
+     * @param exchange the exchange to be matched
+     * @return true if this subscription matches the exchange
+     */
+    public boolean matches(MessageExchangeImpl exchange) {
+        return false;
     }
 }
CVSspam 0.2.8



Reply via email to