| Commit in servicemix/base/src on MAIN | |||
| main/java/org/servicemix/jbi/nmr/Broker.java | +21 | -14 | 1.26 -> 1.27 |
| /SubscriptionManager.java | -1 | 1.5 -> 1.6 | |
| main/java/org/servicemix/jbi/nmr/flow/AbstractFlow.java | +7 | -4 | 1.12 -> 1.13 |
| /Flow.java | +2 | -2 | 1.7 -> 1.8 |
| main/java/org/servicemix/jbi/nmr/flow/cluster/ClusterFlow.java | +4 | -4 | 1.12 -> 1.13 |
| main/java/org/servicemix/jbi/nmr/flow/jca/JCAFlow.java | +3 | -3 | 1.5 -> 1.6 |
| main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java | +3 | -3 | 1.4 -> 1.5 |
| main/java/org/servicemix/jbi/nmr/flow/seda/SedaFlow.java | +3 | -3 | 1.11 -> 1.12 |
| test/java/org/servicemix/jbi/nmr/SubscriptionTest.java | +27 | -22 | 1.2 -> 1.3 |
| +70 | -56 | ||
Fix problems with subscription flow mbean registration
servicemix/base/src/main/java/org/servicemix/jbi/nmr
diff -u -r1.26 -r1.27 --- Broker.java 7 Oct 2005 08:26:36 -0000 1.26 +++ Broker.java 7 Oct 2005 09:55:52 -0000 1.27 @@ -58,7 +58,7 @@
/** * The Broker handles Nomalised Message Routing within ServiceMix *
- * @version $Revision: 1.26 $
+ * @version $Revision: 1.27 $
*/
public class Broker extends BaseLifeCycle {
private JBIContainer container;
@@ -126,17 +126,17 @@
if(this.flow == null){
this.flow = FlowProvider.getFlow(flowName);
}
- this.flow.init(this);
- SubscriptionManager sm = getSubscriptionManager();
- if (sm != null){
- if (sm.getFlow() == null && sm.getFlowName() == null) {
- if (subscriptionFlowName == null || subscriptionFlowName.equals(flowName)){
- sm.setFlow(flow);
- } else {
- sm.setFlowName(subscriptionFlowName);
- }
- }
- sm.init(this, registry);
+ this.flow.init(this, null);
+ if (subscriptionManager.getFlow() == null && subscriptionManager.getFlowName() == null) {
+ if (subscriptionFlowName == null || subscriptionFlowName.equals(flowName)){
+ subscriptionManager.setFlow(flow);
+ } else {
+ subscriptionManager.setFlowName(subscriptionFlowName);
+ }
+ }
+ subscriptionManager.init(this, registry);
+ if (flow != subscriptionManager.getFlow()) {
+ subscriptionManager.getFlow().init(this, "subscription");
}
}
@@ -174,7 +174,9 @@
*/
public void start() throws JBIException {
flow.start();
- subscriptionManager.getFlow().start();
+ if (subscriptionManager.getFlow() != flow) {
+ subscriptionManager.getFlow().start();
+ }
super.start();
}
@@ -185,7 +187,9 @@
*/
public void stop() throws JBIException {
flow.stop();
- subscriptionManager.getFlow().stop();
+ if (subscriptionManager.getFlow() != flow) {
+ subscriptionManager.getFlow().stop();
+ }
super.stop();
}
@@ -197,6 +201,9 @@
public void shutDown() throws JBIException {
stop();
flow.shutDown();
+ if (subscriptionManager.getFlow() != flow) {
+ subscriptionManager.getFlow().shutDown();
+ }
super.shutDown();
}
servicemix/base/src/main/java/org/servicemix/jbi/nmr
diff -u -r1.5 -r1.6 --- SubscriptionManager.java 6 Oct 2005 16:35:33 -0000 1.5 +++ SubscriptionManager.java 7 Oct 2005 09:55:52 -0000 1.6 @@ -51,7 +51,6 @@
if (this.flow == null) {
this.flow = FlowProvider.getFlow(flowName);
}
- this.flow.init(broker);
this.registry = registry;
}
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow
diff -u -r1.12 -r1.13 --- AbstractFlow.java 6 Oct 2005 12:05:51 -0000 1.12 +++ AbstractFlow.java 7 Oct 2005 09:55:52 -0000 1.13 @@ -40,7 +40,7 @@
/** * A simple Straight through flow *
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public abstract class AbstractFlow extends BaseLifeCycle implements Flow {
private static final Log log = LogFactory.getLog(AbstractFlow.class);
@@ -54,12 +54,14 @@
* @param broker
* @throws JBIException
*/
- public void init(Broker broker) throws JBIException {
+ public void init(Broker broker, String subType) throws JBIException {
this.broker = broker;
-// register self with the management context -
+ // register self with the management context
ObjectName objectName = broker.getManagementContext().createObjectName(this);
try {
+ if (subType != null) {
+ objectName = new ObjectName(objectName + ",subtype=" + subType);
+ }
broker.getManagementContext().registerMBean(objectName, this, LifeCycleMBean.class);
}
catch (JMException e) {
@@ -198,4 +200,5 @@
return broker.getContainer().isPersistent();
}
}
+
}
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow
diff -u -r1.7 -r1.8 --- Flow.java 22 Aug 2005 09:25:44 -0000 1.7 +++ Flow.java 7 Oct 2005 09:55:52 -0000 1.8 @@ -26,7 +26,7 @@
/** * A Flow provides different dispatch policies within the NMR *
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public interface Flow {
@@ -35,7 +35,7 @@
* @param broker
* @throws JBIException
*/
- public void init(Broker broker) throws JBIException;
+ public void init(Broker broker, String subType) throws JBIException;
/**
* The type of Flow
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/cluster
diff -u -r1.12 -r1.13 --- ClusterFlow.java 3 Oct 2005 20:56:34 -0000 1.12 +++ ClusterFlow.java 7 Oct 2005 09:55:52 -0000 1.13 @@ -18,6 +18,7 @@
**/ package org.servicemix.jbi.nmr.flow.cluster;
+
import java.util.Iterator; import java.util.Map; import java.util.Set;
@@ -29,7 +30,6 @@
import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage;
-import javax.resource.spi.work.WorkException;
import org.activecluster.Cluster; import org.activecluster.ClusterEvent; import org.activecluster.ClusterException;
@@ -54,7 +54,7 @@
/** * Use for message routing amonst a cluster of containers. All routing/cluster registration happens automatically *
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class ClusterFlow extends SedaFlow implements ClusterListener, MessageListener {
private static final Log log = LogFactory.getLog(ClusterFlow.class);
@@ -78,8 +78,8 @@
* @param broker
* @throws JBIException
*/
- public void init(Broker broker) throws JBIException {
- super.init(broker);
+ public void init(Broker broker, String subType) throws JBIException {
+ super.init(broker, subType);
ClusterFactory fac = new ActiveMQClusterFactory();
try {
this.cluster = fac.createCluster(clusterDestination);
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jca
diff -u -r1.5 -r1.6 --- JCAFlow.java 6 Oct 2005 12:05:51 -0000 1.5 +++ JCAFlow.java 7 Oct 2005 09:55:52 -0000 1.6 @@ -75,7 +75,7 @@
/** * Use for message routing among a network containers. All routing/registration happens automatically *
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class JCAFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
private static final Log log = LogFactory.getLog(JCAFlow.class);
@@ -198,8 +198,8 @@
* @param broker
* @throws JBIException
*/
- public void init(Broker broker) throws JBIException {
- super.init(broker);
+ public void init(Broker broker, String subType) throws JBIException {
+ super.init(broker, subType);
try {
resourceAdapter = createResourceAdapter();
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms
diff -u -r1.4 -r1.5 --- JMSFlow.java 3 Oct 2005 20:56:34 -0000 1.4 +++ JMSFlow.java 7 Oct 2005 09:55:52 -0000 1.5 @@ -59,7 +59,7 @@
/** * Use for message routing among a network containers. All routing/registration happens automatically *
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
private static final Log log = LogFactory.getLog(JMSFlow.class);
@@ -167,8 +167,8 @@
* @param broker
* @throws JBIException
*/
- public void init(Broker broker) throws JBIException {
- super.init(broker);
+ public void init(Broker broker, String subType) throws JBIException {
+ super.init(broker, subType);
try {
if (connectionFactory == null) {
if (jmsURL != null) {
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/seda
diff -u -r1.11 -r1.12 --- SedaFlow.java 3 Oct 2005 20:56:34 -0000 1.11 +++ SedaFlow.java 7 Oct 2005 09:55:52 -0000 1.12 @@ -45,7 +45,7 @@
/** * A simple Straight through flow *
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public class SedaFlow extends AbstractFlow implements ComponentPacketEventListener {
private static final Log log = LogFactory.getLog(SedaFlow.class);
@@ -68,8 +68,8 @@
* @param broker
* @throws JBIException
*/
- public void init(Broker broker) throws JBIException {
- super.init(broker);
+ public void init(Broker broker, String subType) throws JBIException {
+ super.init(broker, subType);
broker.getRegistry().addComponentPacketListener(this);
}
servicemix/base/src/test/java/org/servicemix/jbi/nmr
diff -u -r1.2 -r1.3 --- SubscriptionTest.java 6 Oct 2005 16:35:33 -0000 1.2 +++ SubscriptionTest.java 7 Oct 2005 09:55:52 -0000 1.3 @@ -63,29 +63,34 @@
private void runTest(String flowName, String subscriptionFlowName) throws Exception {
JBIContainer container = new JBIContainer();
- container.getBroker().setFlow(FlowProvider.getFlow(flowName));
- if (subscriptionFlowName != null) {
- container.getBroker().getSubscriptionManager().setFlow(FlowProvider.getFlow(flowName));
+ try {
+ container.getBroker().setFlow(FlowProvider.getFlow(flowName));
+ if (subscriptionFlowName != null) {
+ container.getBroker().getSubscriptionManager().setFlow(FlowProvider.getFlow(subscriptionFlowName));
+ }
+ container.setCreateMBeanServer(true);
+ container.init();
+ container.start();
+
+ SenderListener sender = new SenderListener();
+ container.activateComponent(new ActivationSpec("sender", sender));
+
+ Receiver receiver1 = new ReceiverComponent();
+ container.activateComponent(createReceiverAS("receiver1", receiver1));
+
+ Receiver receiver2 = new ReceiverComponent();
+ container.activateComponent(createReceiverAS("receiver2", receiver2));
+
+ sender.sendMessages(1);
+
+ Thread.sleep(1000);
+
+ assertEquals(1, receiver1.getMessageList().getMessageCount());
+ assertEquals(1, receiver2.getMessageList().getMessageCount());
+ assertEquals(2, sender.responses.size());
+ } finally {
+ container.shutDown();
}
- container.init();
- container.start();
-
- SenderListener sender = new SenderListener();
- container.activateComponent(new ActivationSpec("sender", sender));
-
- Receiver receiver1 = new ReceiverComponent();
- container.activateComponent(createReceiverAS("receiver1", receiver1));
-
- Receiver receiver2 = new ReceiverComponent();
- container.activateComponent(createReceiverAS("receiver2", receiver2));
-
- sender.sendMessages(1);
-
- Thread.sleep(1000);
-
- assertEquals(1, receiver1.getMessageList().getMessageCount());
- assertEquals(1, receiver2.getMessageList().getMessageCount());
- assertEquals(2, sender.responses.size());
}
private ActivationSpec createReceiverAS(String id, Object component) {
