| Commit in servicemix/base/src on MAIN | |||
| main/java/org/servicemix/jbi/nmr/Broker.java | +11 | -8 | 1.23 -> 1.24 |
| test/java/org/servicemix/jbi/nmr/SubscriptionTest.java | +82 | added 1.1 | |
| +93 | -8 | ||
Fix broker to dispatch only outbound exchanges to subscribers
servicemix/base/src/main/java/org/servicemix/jbi/nmr
diff -u -r1.23 -r1.24 --- Broker.java 3 Oct 2005 20:56:34 -0000 1.23 +++ Broker.java 6 Oct 2005 15:47:11 -0000 1.24 @@ -39,7 +39,6 @@
import org.servicemix.jbi.management.BaseLifeCycle; import org.servicemix.jbi.management.ManagementContext; import org.servicemix.jbi.management.OperationInfoHelper;
-import org.servicemix.jbi.management.ParameterHelper;
import org.servicemix.jbi.messaging.ExchangePacket; import org.servicemix.jbi.messaging.MessageExchangeImpl; import org.servicemix.jbi.nmr.flow.Flow;
@@ -55,13 +54,13 @@
/** * The Broker handles Nomalised Message Routing within ServiceMix *
- * @version $Revision: 1.23 $
+ * @version $Revision: 1.24 $
*/
public class Broker extends BaseLifeCycle {
private JBIContainer container;
private Registry registry;
private String flowName = "seda";
- private String subscriptionFlowName = "seda";
+ private String subscriptionFlowName = null;
private WorkManager workManager;
private Flow flow;
private final static Log log = LogFactory.getLog(Broker.class);
@@ -125,11 +124,11 @@
}
this.flow.init(this);
SubscriptionManager sm = getSubscriptionManager();
- if(sm != null){
- if(subscriptionFlowName != null && flowName != null && flowName.equals(subscriptionFlowName)){
+ if (sm != null){
+ if (subscriptionFlowName == null || subscriptionFlowName.equals(flowName)){
sm.setFlow(flow);
}
- sm.init(this,registry);
+ sm.init(this, registry);
}
}
@@ -354,13 +353,17 @@
}
}
- boolean foundRoute =false;
+ boolean foundRoute = false;
if (packet.getEndpoint() != null) {
foundRoute = true;
flow.send(packet);
}
- if (!getSubscriptionManager().dispatchToSubscribers(exchange) && !foundRoute){
+ if (packet.isOutbound()) {
+ foundRoute = getSubscriptionManager().dispatchToSubscribers(exchange);
+ }
+
+ if (!foundRoute){
boolean throwException = true;
ActivationSpec activationSpec = exchange.getActivationSpec();
if (activationSpec != null) {
servicemix/base/src/test/java/org/servicemix/jbi/nmr
SubscriptionTest.java added at 1.1
diff -N SubscriptionTest.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ SubscriptionTest.java 6 Oct 2005 15:47:11 -0000 1.1 @@ -0,0 +1,82 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.nmr;
+
+import java.util.List;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+import junit.framework.TestCase;
+
+import org.servicemix.ExchangeResponseListener;
+import org.servicemix.examples.Receiver;
+import org.servicemix.examples.ReceiverComponent;
+import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.container.ActivationSpec;
+import org.servicemix.jbi.container.JBIContainer;
+import org.servicemix.jbi.container.SubscriptionSpec;
+
+import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
+
+public class SubscriptionTest extends TestCase {
+
+ public void test() throws Exception {
+
+ JBIContainer container = new JBIContainer();
+ container.setFlowName("st");
+ container.init();
+ container.start();
+
+ SenderListener sender = new SenderListener();
+ container.activateComponent(new ActivationSpec("sender", sender));
+
+ Receiver receiver1 = new ReceiverComponent();
+ container.activateComponent(createReceiverAS("receiver1", receiver1));
+
+ Receiver receiver2 = new ReceiverComponent();
+ container.activateComponent(createReceiverAS("receiver2", receiver2));
+
+ sender.sendMessages(1);
+
+ Thread.sleep(1000);
+
+ assertEquals(1, receiver1.getMessageList().getMessageCount());
+ assertEquals(1, receiver2.getMessageList().getMessageCount());
+ assertEquals(2, sender.responses.size());
+ }
+
+ private ActivationSpec createReceiverAS(String id, Object component) {
+ ActivationSpec as = new ActivationSpec(id, component);
+ SubscriptionSpec ss = new SubscriptionSpec();
+ ss.setService(SenderComponent.SERVICE);
+ as.setSubscriptions(new SubscriptionSpec[] { ss });
+ return as;
+ }
+
+ public static class SenderListener extends SenderComponent implements ExchangeResponseListener {
+
+ public List responses = new CopyOnWriteArrayList();
+
+ public void onMessageExchangeResponse(MessageExchange exchange) throws MessagingException {
+ responses.add(exchange);
+ }
+
+ }
+
+}
