Commit in servicemix/base/src/main/java/org/servicemix/jbi on MAIN
nmr/SubscriptionManager.java+11.2 -> 1.3
framework/SubscriptionRegistry.java+6-51.1 -> 1.2
         /Registry.java+2-11.6 -> 1.7
container/SubscriptionSpec.java+38-111.3 -> 1.4
+47-17
4 modified files
implementation of subscription semantics (SM-35) finished

servicemix/base/src/main/java/org/servicemix/jbi/nmr
SubscriptionManager.java 1.2 -> 1.3
diff -u -r1.2 -r1.3
--- SubscriptionManager.java	22 Sep 2005 12:22:19 -0000	1.2
+++ SubscriptionManager.java	22 Sep 2005 18:17:58 -0000	1.3
@@ -80,6 +80,7 @@
     protected void dispatchToSubscriber(MessageExchangeImpl exchange, ServiceEndpointImpl endpoint) throws JBIException {
         ExchangePacket packet = exchange.getPacket().copy();
         packet.setEndpoint(endpoint);
+        packet.setDestinationId(endpoint.getComponentNameSpace());
         flow.send(packet);
     }
 

servicemix/base/src/main/java/org/servicemix/jbi/framework
SubscriptionRegistry.java 1.1 -> 1.2
diff -u -r1.1 -r1.2
--- SubscriptionRegistry.java	22 Sep 2005 12:22:19 -0000	1.1
+++ SubscriptionRegistry.java	22 Sep 2005 18:17:58 -0000	1.2
@@ -33,20 +33,22 @@
  * Maintains a registry of the applicable subscriptions currently active for the
  * current components
  * 
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
  */
 public class SubscriptionRegistry {
 
     private Map subscriptions = new ConcurrentHashMap();
-
+    private Registry registry;
     
+    public void init(Registry registry) {
+        this.registry = registry;
+    }
 
     /**
      * @param subscription
      * @param endpoint
      */
     public void registerSubscription(SubscriptionSpec subscription, ServiceEndpointImpl endpoint) {
-        System.out.println("REGISTER SUB: " + subscription);
         subscriptions.put(subscription, endpoint);
     }
 
@@ -69,8 +71,7 @@
             Map.Entry entry = (Map.Entry) iter.next();
 
             SubscriptionSpec subscription = (SubscriptionSpec) entry.getKey();
-System.out.println("TESTING SUB -" + subscription + " MATCH IS TRUE = " + subscription.matches(exchange));
-            if (subscription.matches(exchange)) {
+            if (subscription.matches(registry,exchange)) {
                 if (result == null) {
                     result = new ArrayList();
                 }

servicemix/base/src/main/java/org/servicemix/jbi/framework
Registry.java 1.6 -> 1.7
diff -u -r1.6 -r1.7
--- Registry.java	22 Sep 2005 12:22:19 -0000	1.6
+++ Registry.java	22 Sep 2005 18:17:58 -0000	1.7
@@ -47,7 +47,7 @@
 /**
  * Registry - state infomation including running state, SA's deployed etc.
  * 
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
  */
 public class Registry extends BaseLifeCycle {
     private final static Log log = LogFactory.getLog(Registry.class);
@@ -85,6 +85,7 @@
     public void init(JBIContainer container) throws JBIException {
         this.container = container;
         this.componentRegistry.setContainerName(container.getName());
+        this.subscriptionRegistry.init(this);
     }
 
     /**

servicemix/base/src/main/java/org/servicemix/jbi/container
SubscriptionSpec.java 1.3 -> 1.4
diff -u -r1.3 -r1.4
--- SubscriptionSpec.java	22 Sep 2005 12:22:19 -0000	1.3
+++ SubscriptionSpec.java	22 Sep 2005 18:17:58 -0000	1.4
@@ -17,13 +17,17 @@
  **/
 package org.servicemix.jbi.container;
 
+import org.servicemix.jbi.framework.ComponentNameSpace;
+import org.servicemix.jbi.framework.Registry;
 import org.servicemix.jbi.messaging.ExchangePacket;
 import org.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.servicemix.jbi.resolver.EndpointFilter;
 import org.servicemix.jbi.resolver.SubscriptionFilter;
+import org.servicemix.jbi.servicedesc.ServiceEndpointImpl;
 
 import java.io.Serializable;
 
+import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
 
 /**
@@ -89,21 +93,44 @@
      * @param exchange the exchange to be matched
      * @return true if this subscription matches the exchange
      */
-    public boolean matches(MessageExchangeImpl exchange) {
+    public boolean matches(Registry registry,MessageExchangeImpl exchange){
         boolean result = false;
-        
-        if (filter != null) {
+
+        if(filter != null){
             result = filter.matches(exchange);
-        }else {
+        }else{
             ExchangePacket packet = exchange.getPacket();
-            QName exchangeServiceName = packet.getServiceName();
-            QName exchangeInterfaceName = packet.getInterfaceName();
-            if (service != null && exchangeServiceName != null) {
-                result = service.equals(exchangeServiceName);
-            }
-            if (interfaceName != null && exchangeInterfaceName != null){
-                result &= interfaceName.equals(exchangeInterfaceName);
+            ComponentNameSpace sourceId = packet.getSourceId();
+            if(sourceId != null){
+                // get the list of services
+                if(service != null){
+                    ServiceEndpoint[] ses = registry.getEndpointsForService(service);
+                    if(ses != null){
+                        for(int i = 0;i < ses.length;i++){
+                            ServiceEndpointImpl se = (ServiceEndpointImpl) ses[i];
+                            if(se.getComponentNameSpace() != null && se.getComponentNameSpace().equals(sourceId)){
+                                result = true;
+                                break;
+                            }
+                        }
+                    }
+                }
+                if(result && interfaceName != null){
+                    ServiceEndpoint[] ses = registry.getEndpoints(interfaceName);
+                    if(ses != null){
+                        result = false;
+                        for(int i = 0;i < ses.length;i++){
+                            ServiceEndpointImpl se = (ServiceEndpointImpl) ses[i];
+                            if(se.getComponentNameSpace() != null && se.getComponentNameSpace().equals(sourceId)){
+                                result = true;
+                                break;
+                            }
+                        }
+                    }
+                }
+
             }
+
         }
         return result;
     }
CVSspam 0.2.8



Reply via email to