Commit in servicemix/base/src on MAIN
main/java/org/servicemix/jbi/nmr/Broker.java+11-81.23 -> 1.24
test/java/org/servicemix/jbi/nmr/SubscriptionTest.java+82added 1.1
+93-8
1 added + 1 modified, total 2 files
Fix broker to dispatch only outbound exchanges to subscribers

servicemix/base/src/main/java/org/servicemix/jbi/nmr
Broker.java 1.23 -> 1.24
diff -u -r1.23 -r1.24
--- Broker.java	3 Oct 2005 20:56:34 -0000	1.23
+++ Broker.java	6 Oct 2005 15:47:11 -0000	1.24
@@ -39,7 +39,6 @@
 import org.servicemix.jbi.management.BaseLifeCycle;
 import org.servicemix.jbi.management.ManagementContext;
 import org.servicemix.jbi.management.OperationInfoHelper;
-import org.servicemix.jbi.management.ParameterHelper;
 import org.servicemix.jbi.messaging.ExchangePacket;
 import org.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.servicemix.jbi.nmr.flow.Flow;
@@ -55,13 +54,13 @@
 /**
  * The Broker handles Nomalised Message Routing within ServiceMix
  * 
- * @version $Revision: 1.23 $
+ * @version $Revision: 1.24 $
  */
 public class Broker extends BaseLifeCycle {
     private JBIContainer container;
     private Registry registry;
     private String flowName = "seda";
-    private String subscriptionFlowName = "seda";
+    private String subscriptionFlowName = null;
     private WorkManager workManager;
     private Flow flow;
     private final static Log log = LogFactory.getLog(Broker.class);
@@ -125,11 +124,11 @@
         }
         this.flow.init(this);
         SubscriptionManager sm = getSubscriptionManager();
-        if(sm != null){
-            if(subscriptionFlowName != null && flowName != null && flowName.equals(subscriptionFlowName)){
+        if (sm != null){
+            if (subscriptionFlowName == null || subscriptionFlowName.equals(flowName)){
                 sm.setFlow(flow);
             }
-            sm.init(this,registry);
+            sm.init(this, registry);
         }
     }
 
@@ -354,13 +353,17 @@
             }
         }
 
-        boolean foundRoute =false;
+        boolean foundRoute = false;
         if (packet.getEndpoint() != null) {
             foundRoute = true;
             flow.send(packet);
         }
 
-        if (!getSubscriptionManager().dispatchToSubscribers(exchange) && !foundRoute){
+        if (packet.isOutbound()) {
+        	foundRoute = getSubscriptionManager().dispatchToSubscribers(exchange);
+        }
+        
+        if (!foundRoute){
             boolean throwException = true;
             ActivationSpec activationSpec = exchange.getActivationSpec();
             if (activationSpec != null) {

servicemix/base/src/test/java/org/servicemix/jbi/nmr
SubscriptionTest.java added at 1.1
diff -N SubscriptionTest.java
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ SubscriptionTest.java	6 Oct 2005 15:47:11 -0000	1.1
@@ -0,0 +1,82 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi.nmr;
+
+import java.util.List;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+import junit.framework.TestCase;
+
+import org.servicemix.ExchangeResponseListener;
+import org.servicemix.examples.Receiver;
+import org.servicemix.examples.ReceiverComponent;
+import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.container.ActivationSpec;
+import org.servicemix.jbi.container.JBIContainer;
+import org.servicemix.jbi.container.SubscriptionSpec;
+
+import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
+
+public class SubscriptionTest extends TestCase {
+
+	public void test() throws Exception {
+		
+		JBIContainer container = new JBIContainer();
+		container.setFlowName("st");
+		container.init();
+		container.start();
+		
+		SenderListener sender = new SenderListener();
+		container.activateComponent(new ActivationSpec("sender", sender));
+		
+		Receiver receiver1 = new ReceiverComponent();
+		container.activateComponent(createReceiverAS("receiver1", receiver1));
+
+		Receiver receiver2 = new ReceiverComponent();
+		container.activateComponent(createReceiverAS("receiver2", receiver2));
+		
+		sender.sendMessages(1);
+		
+		Thread.sleep(1000);
+		
+		assertEquals(1, receiver1.getMessageList().getMessageCount());
+		assertEquals(1, receiver2.getMessageList().getMessageCount());
+		assertEquals(2, sender.responses.size());
+	}
+	
+	private ActivationSpec createReceiverAS(String id, Object component) {
+		ActivationSpec as = new ActivationSpec(id, component);
+		SubscriptionSpec ss = new SubscriptionSpec();
+		ss.setService(SenderComponent.SERVICE);
+		as.setSubscriptions(new SubscriptionSpec[] { ss }); 
+		return as;
+	}
+	
+	public static class SenderListener extends SenderComponent implements ExchangeResponseListener {
+
+		public List responses = new CopyOnWriteArrayList();
+		
+		public void onMessageExchangeResponse(MessageExchange exchange) throws MessagingException {
+			responses.add(exchange);			
+		}
+		
+	}
+	
+}
CVSspam 0.2.8



Reply via email to