Commit in servicemix/base/src on MAIN
main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java+59-661.1 -> 1.2
test/java/org/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java+81added 1.1
+140-66
1 added + 1 modified, total 2 files
Added test case for JMSFlow

servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms
JMSFlow.java 1.1 -> 1.2
diff -u -r1.1 -r1.2
--- JMSFlow.java	24 Aug 2005 10:58:55 -0000	1.1
+++ JMSFlow.java	24 Aug 2005 14:07:24 -0000	1.2
@@ -54,11 +54,12 @@
 import org.servicemix.jbi.nmr.flow.seda.SedaQueue;
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
 
 /**
- * Use for message routing amonst a cluster of containers. All routing/cluster registration happens automatically
+ * Use for message routing among a network containers. All routing/registration happens automatically
  * 
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
  */
 public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
     private static final Log log = LogFactory.getLog(JMSFlow.class);
@@ -75,8 +76,9 @@
     private Session broadcastSession;
     private Session inboundSession;
     private ConsumerAdvisor advisor;
-    private Map clusterNodeKeyMap = new ConcurrentHashMap();
-    private Map clusterComponentKeyMap = new ConcurrentHashMap();
+    private Map networkNodeKeyMap = new ConcurrentHashMap();
+    private Map networkComponentKeyMap = new ConcurrentHashMap();
+    private SynchronizedBoolean started = new SynchronizedBoolean(false);
 
     /**
      * The type of Flow
@@ -84,7 +86,7 @@
      * @return the type
      */
     public String getDescription() {
-        return "cluster";
+        return "jms";
     }
 
     /**
@@ -130,20 +132,6 @@
     }
 
     /**
-     * @return Returns the connection.
-     */
-    public Connection getConnection() {
-        return connection;
-    }
-
-    /**
-     * @param connection The connection to set.
-     */
-    public void setConnection(Connection connection) {
-        this.connection = connection;
-    }
-
-    /**
      * @return Returns the connectionFactory.
      */
     public ConnectionFactory getConnectionFactory() {
@@ -180,21 +168,19 @@
     public void init(Broker broker) throws JBIException {
         super.init(broker);
         try {
-            if (connection == null) {
-                if (connectionFactory == null) {
-                    if (jmsURL != null) {
-                        connectionFactory = new ActiveMQConnectionFactory(jmsURL);
-                    }
-                    else {
-                        connectionFactory = new ActiveMQConnectionFactory();
-                    }
-                    if (userName != null) {
-                        connection = connectionFactory.createConnection(userName, password);
-                    }
-                    else {
-                        connection = connectionFactory.createConnection();
-                    }
+            if (connectionFactory == null) {
+                if (jmsURL != null) {
+                    connectionFactory = new ActiveMQConnectionFactory(jmsURL);
                 }
+                else {
+                    connectionFactory = new ActiveMQConnectionFactory();
+                }
+            }
+            if (userName != null) {
+                connection = connectionFactory.createConnection(userName, password);
+            }
+            else {
+                connection = connectionFactory.createConnection();
             }
             connection.setClientID(broker.getContainerName());
             connection.start();
@@ -223,13 +209,15 @@
      * @throws JBIException
      */
     public void start() throws JBIException {
-        super.start();
-        try {
-            advisor.start();
-        }
-        catch (JMSException e) {
-            JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
-            throw jbiEx;
+        if (started.commit(false, true)) {
+            super.start();
+            try {
+                advisor.start();
+            }
+            catch (JMSException e) {
+                JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
+                throw jbiEx;
+            }
         }
     }
 
@@ -239,18 +227,21 @@
      * @throws JBIException
      */
     public void stop() throws JBIException {
-        super.stop();
-        try {
-            advisor.stop();
-        }
-        catch (JMSException e) {
-            JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
-            throw jbiEx;
+        if (started.commit(true, false)) {
+            super.stop();
+            try {
+                advisor.stop();
+            }
+            catch (JMSException e) {
+                JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
+                throw jbiEx;
+            }
         }
     }
 
     public void shutDown() throws JBIException {
         super.shutDown();
+        stop();
         if (this.connection != null) {
             try {
                 this.connection.close();
@@ -330,17 +321,19 @@
      * @param event
      */
     public void onEvent(ConsumerAdvisoryEvent event) {
-        ConsumerInfo info = event.getInfo();
-        if (info.isStarted()) {
-            for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
-                LocalComponentConnector lcc = (LocalComponentConnector) i.next();
-                ComponentPacket packet = lcc.getPacket();
-                ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
-                onEvent(cpe);
+        if (started.get()) {
+            ConsumerInfo info = event.getInfo();
+            if (info.isStarted()) {
+                for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
+                    LocalComponentConnector lcc = (LocalComponentConnector) i.next();
+                    ComponentPacket packet = lcc.getPacket();
+                    ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
+                    onEvent(cpe);
+                }
+            }
+            else {
+                removeAllPackets(info.getClientId());
             }
-        }
-        else {
-            removeAllPackets(info.getClientId());
         }
     }
 
@@ -433,11 +426,11 @@
     }
 
     private void addRemotePacket(String containerName, ComponentPacket packet) {
-        clusterComponentKeyMap.put(packet.getComponentNameSpace(), containerName);
-        Set set = (Set) clusterNodeKeyMap.get(containerName);
+        networkComponentKeyMap.put(packet.getComponentNameSpace(), containerName);
+        Set set = (Set) networkNodeKeyMap.get(containerName);
         if (set == null) {
             set = new CopyOnWriteArraySet();
-            clusterNodeKeyMap.put(containerName, set);
+            networkNodeKeyMap.put(containerName, set);
             ComponentConnector cc = new ComponentConnector(packet);
             log.info("Adding Remote Component: " + cc);
             broker.getRegistry().addRemoteComponentConnector(cc);
@@ -446,7 +439,7 @@
     }
 
     private void updateRemotePacket(String containerName, ComponentPacket packet) {
-        Set set = (Set) clusterNodeKeyMap.get(containerName);
+        Set set = (Set) networkNodeKeyMap.get(containerName);
         if (set != null) {
             set.remove(packet);
             set.add(packet);
@@ -457,27 +450,27 @@
     }
 
     private void removeRemotePacket(String containerName, ComponentPacket packet) {
-        clusterComponentKeyMap.remove(packet.getComponentNameSpace());
-        Set set = (Set) clusterNodeKeyMap.get(containerName);
+        networkComponentKeyMap.remove(packet.getComponentNameSpace());
+        Set set = (Set) networkNodeKeyMap.get(containerName);
         if (set != null) {
             set.remove(packet);
             ComponentConnector cc = new ComponentConnector(packet);
             log.info("Removing remote Component: " + cc);
             broker.getRegistry().removeRemoteComponentConnector(cc);
             if (set.isEmpty()) {
-                clusterNodeKeyMap.remove(containerName);
+                networkNodeKeyMap.remove(containerName);
             }
         }
     }
 
     private void removeAllPackets(String containerName) {
-        Set set = (Set) clusterNodeKeyMap.remove(containerName);
+        Set set = (Set) networkNodeKeyMap.remove(containerName);
         for (Iterator i = set.iterator();i.hasNext();) {
             ComponentPacket packet = (ComponentPacket) i.next();
             ComponentConnector cc = new ComponentConnector(packet);
             log.info("Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
             broker.getRegistry().removeRemoteComponentConnector(cc);
-            clusterComponentKeyMap.remove(packet.getComponentNameSpace());
+            networkComponentKeyMap.remove(packet.getComponentNameSpace());
         }
     }
 }

servicemix/base/src/test/java/org/servicemix/jbi/nmr/flow/jms
JMSFlowTest.java added at 1.1
diff -N JMSFlowTest.java
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ JMSFlowTest.java	24 Aug 2005 14:07:24 -0000	1.1
@@ -0,0 +1,81 @@
+/** 
+ * <a href="">ServiceMix: The open source ESB</a> 
+ * 
+ * Copyright 2005 RAJD Consultancy Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi.nmr.flow.jms;
+
+import java.io.File;
+import junit.framework.TestCase;
+import org.servicemix.examples.*;
+import org.servicemix.jbi.container.ActivationSpec;
+import org.servicemix.jbi.container.JBIContainer;
+import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
+import org.servicemix.jbi.util.FileUtil;
+
+/**
+ *
+ * JMSFlowTest
+ */
+public class JMSFlowTest extends TestCase {
+    JBIContainer senderContainer = new JBIContainer();
+    JBIContainer receiverContainer = new JBIContainer();
+    private SenderComponent sender;
+    private ReceiverComponent receiver;
+    private static final int NUM_MESSAGES = 10;
+    
+    /*
+     * @see TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+       
+        senderContainer.setName("senderContainer");
+        senderContainer.setFlowName("jms");
+        senderContainer.init();
+        senderContainer.start();
+        JMSFlow senderFlow = (JMSFlow) senderContainer.getFlow();
+        
+        
+        receiverContainer.setName("receiverContainer");
+        receiverContainer.setFlowName("jms");
+        receiverContainer.init();
+        receiverContainer.start();
+        JMSFlow receiverFlow = (JMSFlow) senderContainer.getFlow();
+
+        receiver = new ReceiverComponent();
+        sender = new SenderComponent();
+        sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
+
+        senderContainer.activateComponent(new ActivationSpec("sender", sender));
+        receiverContainer.activateComponent(new ActivationSpec("receiver", receiver));
+
+        
+        Thread.sleep(5000);
+    }
+    
+    protected void tearDown() throws Exception{
+        super.tearDown();
+        senderContainer.shutDown();
+        receiverContainer.shutDown();
+    }
+    
+    public void testInOnly() throws Exception {
+      sender.sendMesssages(NUM_MESSAGES);
+      Thread.sleep(3000);
+      receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
+    }
+}
CVSspam 0.2.8



Reply via email to