Title: [974] trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java: Regression: jms flow queue consumers were not started anymore
- Revision
- 974
- Author
- gnt
- Date
- 2005-11-30 11:52:10 -0500 (Wed, 30 Nov 2005)
Log Message
Regression: jms flow queue consumers were not started anymore
Modified Paths
Diff
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (973 => 974)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-11-30 16:31:08 UTC (rev 973)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-11-30 16:52:10 UTC (rev 974)
@@ -221,6 +221,13 @@
advisor = new ConsumerAdvisor(connection, broadcastTopic);
advisor.addListener(this);
advisor.start();
+ // Start queue consumers for all components
+ for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
+ LocalComponentConnector lcc = (LocalComponentConnector) i.next();
+ ComponentPacket packet = lcc.getPacket();
+ ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
+ onEvent(cpe, false);
+ }
}
catch (JMSException e) {
JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
@@ -277,6 +284,15 @@
* @param event
*/
public void onEvent(ComponentPacketEvent event) {
+ onEvent(event, true);
+ }
+
+ /**
+ * Process state changes in Components
+ *
+ * @param event
+ */
+ public void onEvent(ComponentPacketEvent event, boolean broadcast) {
try {
// broadcast internal changes to the network
if (started.get() && event.getPacket().getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
@@ -294,9 +310,11 @@
consumer.close();
}
}
- ObjectMessage msg = broadcastSession.createObjectMessage(event);
- log.info(broker.getContainerName() + ": broadcasting info for " + event.getPacket().getComponentNameSpace());
- topicProducer.send(msg);
+ if (broadcast) {
+ ObjectMessage msg = broadcastSession.createObjectMessage(event);
+ log.info(broker.getContainerName() + ": broadcasting info for " + event.getPacket().getComponentNameSpace());
+ topicProducer.send(msg);
+ }
}
}
catch (JMSException e) {