Commit in servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms on MAIN
JMSFlow.java+19-41.2 -> 1.3
Changed so that each component subscribes to an individual queue - to allow for load balancing across clusters.

servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms
JMSFlow.java 1.2 -> 1.3
diff -u -r1.2 -r1.3
--- JMSFlow.java	24 Aug 2005 14:07:24 -0000	1.2
+++ JMSFlow.java	26 Aug 2005 10:41:13 -0000	1.3
@@ -59,12 +59,12 @@
 /**
  * Use for message routing among a network containers. All routing/registration happens automatically
  * 
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
  */
 public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
     private static final Log log = LogFactory.getLog(JMSFlow.class);
     private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
-    private String jmsURL = ActiveMQConnection.DEFAULT_URL;
+    private String jmsURL = "peer://org.servicemix?persistent=false";
     private String userName;
     private String password;
     private ConnectionFactory connectionFactory;
@@ -78,6 +78,7 @@
     private ConsumerAdvisor advisor;
     private Map networkNodeKeyMap = new ConcurrentHashMap();
     private Map networkComponentKeyMap = new ConcurrentHashMap();
+    private Map consumerMap = new ConcurrentHashMap();
     private SynchronizedBoolean started = new SynchronizedBoolean(false);
 
     /**
@@ -305,6 +306,20 @@
     public void onEvent(ComponentPacketEvent event) {
         super.onEvent(event);
         try {
+            String componentName = event.getPacket().getComponentNameSpace().getName();
+            if (event.getStatus() == ComponentPacketEvent.ACTIVATED){
+                
+                Queue queue = inboundSession.createQueue(INBOUND_PREFIX +componentName);
+                MessageConsumer consumer = inboundSession.createConsumer(queue);
+                consumer.setMessageListener(this);
+                consumerMap.put(componentName,consumer);
+            }else if (event.getStatus() == ComponentPacketEvent.DEACTIVATED){
+                MessageConsumer consumer = (MessageConsumer) consumerMap.remove(componentName);
+                if (consumer != null){
+                    consumer.close();
+                }
+                
+            }
             // broadcast change to the network
             ObjectMessage msg = broadcastSession.createObjectMessage(event);
             topicProducer.send(msg);
@@ -352,8 +367,8 @@
             }
             else {
                 try {
-                    String containerName = cc.getComponentNameSpace().getContainerName();
-                    Queue queue = inboundSession.createQueue(INBOUND_PREFIX + containerName);
+                    String componentName = cc.getComponentNameSpace().getName();
+                    Queue queue = inboundSession.createQueue(INBOUND_PREFIX + componentName);
                     ObjectMessage msg = inboundSession.createObjectMessage(packet);
                     queueProducer.send(queue, msg);
                 }
CVSspam 0.2.8



Reply via email to