Title: [973] trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java: Change jms flow to send less messages when discovering nodes.
Revision
973
Author
gnt
Date
2005-11-30 11:31:08 -0500 (Wed, 30 Nov 2005)

Log Message

Change jms flow to send less messages when discovering nodes.

Modified Paths


Diff

Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (972 => 973)

--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java	2005-11-30 16:04:08 UTC (rev 972)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java	2005-11-30 16:31:08 UTC (rev 973)
@@ -19,7 +19,6 @@
 package org.servicemix.jbi.nmr.flow.jms;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 import org.activemq.ActiveMQConnection;
@@ -77,6 +76,7 @@
     private MessageProducer topicProducer;
     private Topic broadcastTopic;
     private Session broadcastSession;
+    private MessageConsumer broadcastConsumer;
     private Session inboundSession;
     private ConsumerAdvisor advisor;
     private Map networkNodeKeyMap = new ConcurrentHashMap();
@@ -171,6 +171,7 @@
      * @throws JBIException
      */
     public void init(Broker broker, String subType) throws JBIException {
+        log.info(broker.getContainerName() + ": Initializing jms flow");
         super.init(broker, subType);
         broker.getRegistry().addComponentPacketListener(this);
         try {
@@ -196,12 +197,8 @@
             queueProducer = inboundSession.createProducer(null);
             broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
-            MessageConsumer broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
-            broadcastConsumer.setMessageListener(this);
             topicProducer = broadcastSession.createProducer(broadcastTopic);
             topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-            advisor = new ConsumerAdvisor(connection, broadcastTopic);
-            advisor.addListener(this);
         }
         catch (JMSException e) {
             log.error("Failed to initialize JMSFlow", e);
@@ -216,8 +213,13 @@
      */
     public void start() throws JBIException {
         if (started.compareAndSet(false, true)) {
+            log.info(broker.getContainerName() + ": Starting jms flow");
             super.start();
             try {
+                broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
+                broadcastConsumer.setMessageListener(this);
+                advisor = new ConsumerAdvisor(connection, broadcastTopic);
+                advisor.addListener(this);
                 advisor.start();
             }
             catch (JMSException e) {
@@ -234,9 +236,11 @@
      */
     public void stop() throws JBIException {
         if (started.compareAndSet(true, false)) {
+            log.info(broker.getContainerName() + ": Stopping jms flow");
             super.stop();
             try {
                 advisor.stop();
+                broadcastConsumer.close();
             }
             catch (JMSException e) {
                 JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
@@ -274,24 +278,26 @@
      */
     public void onEvent(ComponentPacketEvent event) {
         try {
-            String componentName = event.getPacket().getComponentNameSpace().getName();
-            if(event.getStatus()==ComponentPacketEvent.ACTIVATED){
-                if(!consumerMap.containsKey(componentName)){
-                    Queue queue=inboundSession.createQueue(INBOUND_PREFIX+componentName);
-                    MessageConsumer consumer=inboundSession.createConsumer(queue);
-                    consumer.setMessageListener(this);
-                    consumerMap.put(componentName,consumer);
+            // broadcast internal changes to the network
+            if (started.get() && event.getPacket().getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
+                String componentName = event.getPacket().getComponentNameSpace().getName();
+                if (event.getStatus() == ComponentPacketEvent.ACTIVATED) {
+                    if (!consumerMap.containsKey(componentName)) {
+                        Queue queue = inboundSession.createQueue(INBOUND_PREFIX + componentName);
+                        MessageConsumer consumer = inboundSession.createConsumer(queue);
+                        consumer.setMessageListener(this);
+                        consumerMap.put(componentName,consumer);
+                    }
+                } else if (event.getStatus() == ComponentPacketEvent.DEACTIVATED) {
+                    MessageConsumer consumer = (MessageConsumer) consumerMap.remove(componentName);
+                    if (consumer != null){
+                        consumer.close();
+                    }
                 }
-            }else if(event.getStatus()==ComponentPacketEvent.DEACTIVATED){
-                MessageConsumer consumer=(MessageConsumer) consumerMap.remove(componentName);
-                if(consumer!=null){
-                    consumer.close();
-                }
+                ObjectMessage msg = broadcastSession.createObjectMessage(event);
+                log.info(broker.getContainerName() + ": broadcasting info for " + event.getPacket().getComponentNameSpace());
+                topicProducer.send(msg);
             }
-            // broadcast change to the network
-            ObjectMessage msg = broadcastSession.createObjectMessage(event);
-            topicProducer.send(msg);
-            log.info("broadcast to internal JMS network: " + event);
         }
         catch (JMSException e) {
             log.error("failed to broadcast to the internal JMS network: " + event, e);
@@ -306,17 +312,30 @@
     public void onEvent(ConsumerAdvisoryEvent event) {
         if (started.get()) {
             ConsumerInfo info = event.getInfo();
-            if (info.isStarted()) {
-                for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
-                    LocalComponentConnector lcc = (LocalComponentConnector) i.next();
-                    ComponentPacket packet = lcc.getPacket();
-                    ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
-                    onEvent(cpe);
+            if (!broker.getContainerName().equals(info.getClientId())) {
+                if (info.isStarted()) {
+                    log.info(broker.getContainerName() + ": new node discovered " + info.getClientId());
+                    // The new node is started, so send it all components state
+                    try {
+                        String destination = INBOUND_PREFIX + info.getClientId();
+                        Queue queue = inboundSession.createQueue(destination);
+                        for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
+                            LocalComponentConnector lcc = (LocalComponentConnector) i.next();
+                            ComponentPacket packet = lcc.getPacket();
+                            ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
+                            ObjectMessage msg = inboundSession.createObjectMessage(cpe);
+                            log.info(broker.getContainerName() + ": sending info to " + info.getClientId() + " for " + lcc.getComponentNameSpace());
+                            queueProducer.send(queue, msg);
+                        }
+                    } catch (JMSException e) {
+                        log.error("failed to broadcast to the internal JMS network: " + event, e);
+                    }
                 }
+                else {
+                    log.info(broker.getContainerName() + ": node " + info.getClientId() + " has stopped");
+                    removeAllPackets(info.getClientId());
+                }
             }
-            else {
-                removeAllPackets(info.getClientId());
-            }
         }
     }
 
@@ -368,7 +387,7 @@
      */
     public void onMessage(Message message) {
         try {
-            if (message != null && message instanceof ObjectMessage) {
+            if (started.get() && message != null && message instanceof ObjectMessage) {
                 ObjectMessage objMsg = (ObjectMessage) message;
                 Object obj = objMsg.getObject();
                 if (obj != null) {
@@ -404,33 +423,18 @@
             int eventStatus = event.getStatus();
             switch (eventStatus) {
                 case ComponentPacketEvent.ACTIVATED:
-                    addRemotePacket(containerName, packet);
+                case ComponentPacketEvent.STATE_CHANGE:
+                    updateRemotePacket(containerName, packet);
                     break;
                 case ComponentPacketEvent.DEACTIVATED:
                     removeRemotePacket(containerName, packet);
                     break;
-                case ComponentPacketEvent.STATE_CHANGE:
-                    updateRemotePacket(containerName, packet);
-                    break;
                 default:
                     log.warn("Unable to determine ComponentPacketEvent type: " + eventStatus + " for packet: " + packet);
             }         	
         }
     }
 
-    private void addRemotePacket(String containerName, ComponentPacket packet) {
-        networkComponentKeyMap.put(packet.getComponentNameSpace(), containerName);
-        Set set = (Set) networkNodeKeyMap.get(containerName);
-        if (set == null) {
-            set = new CopyOnWriteArraySet();
-            networkNodeKeyMap.put(containerName, set);
-        }
-        ComponentConnector cc = new ComponentConnector(packet);
-        log.info("Adding Remote Component: " + cc);
-        broker.getRegistry().addRemoteComponentConnector(cc);
-        set.add(packet);
-    }
-
     private void updateRemotePacket(String containerName, ComponentPacket packet) {
         Set set = (Set) networkNodeKeyMap.get(containerName);
         if (set != null) {
@@ -438,7 +442,7 @@
             set.add(packet);
         }
         ComponentConnector cc = new ComponentConnector(packet);
-        log.info("Updating remote Component: " + cc);
+        log.info(broker.getContainerName() + ": updating remote component: " + cc);
         broker.getRegistry().updateRemoteComponentConnector(cc);
     }
 
@@ -448,7 +452,7 @@
         if (set != null) {
             set.remove(packet);
             ComponentConnector cc = new ComponentConnector(packet);
-            log.info("Removing remote Component: " + cc);
+            log.info(broker.getContainerName() + ": removing remote component: " + cc);
             broker.getRegistry().removeRemoteComponentConnector(cc);
             if (set.isEmpty()) {
                 networkNodeKeyMap.remove(containerName);
@@ -462,7 +466,7 @@
 	        for (Iterator i = set.iterator();i.hasNext();) {
 	            ComponentPacket packet = (ComponentPacket) i.next();
 	            ComponentConnector cc = new ComponentConnector(packet);
-	            log.info("Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
+	            log.info(broker.getContainerName() + ": Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
 	            broker.getRegistry().removeRemoteComponentConnector(cc);
 	            networkComponentKeyMap.remove(packet.getComponentNameSpace());
 	        }

Reply via email to