Title: [974] trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java: Regression: jms flow queue consumers were not started anymore
Revision
974
Author
gnt
Date
2005-11-30 11:52:10 -0500 (Wed, 30 Nov 2005)

Log Message

Regression: jms flow queue consumers were not started anymore

Modified Paths


Diff

Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (973 => 974)

--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java	2005-11-30 16:31:08 UTC (rev 973)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java	2005-11-30 16:52:10 UTC (rev 974)
@@ -221,6 +221,13 @@
                 advisor = new ConsumerAdvisor(connection, broadcastTopic);
                 advisor.addListener(this);
                 advisor.start();
+                // Start queue consumers for all components
+                for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
+                    LocalComponentConnector lcc = (LocalComponentConnector) i.next();
+                    ComponentPacket packet = lcc.getPacket();
+                    ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
+                    onEvent(cpe, false);
+                }
             }
             catch (JMSException e) {
                 JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
@@ -277,6 +284,15 @@
      * @param event
      */
     public void onEvent(ComponentPacketEvent event) {
+        onEvent(event, true);
+    }
+    
+    /**
+     * Process state changes in Components
+     * 
+     * @param event
+     */
+    public void onEvent(ComponentPacketEvent event, boolean broadcast) {
         try {
             // broadcast internal changes to the network
             if (started.get() && event.getPacket().getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
@@ -294,9 +310,11 @@
                         consumer.close();
                     }
                 }
-                ObjectMessage msg = broadcastSession.createObjectMessage(event);
-                log.info(broker.getContainerName() + ": broadcasting info for " + event.getPacket().getComponentNameSpace());
-                topicProducer.send(msg);
+                if (broadcast) {
+                    ObjectMessage msg = broadcastSession.createObjectMessage(event);
+                    log.info(broker.getContainerName() + ": broadcasting info for " + event.getPacket().getComponentNameSpace());
+                    topicProducer.send(msg);
+                }
             }
         }
         catch (JMSException e) {

Reply via email to