| Commit in servicemix/base/src on MAIN | |||
| main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java | +59 | -66 | 1.1 -> 1.2 |
| test/java/org/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java | +81 | added 1.1 | |
| +140 | -66 | ||
Added test case for JMSFlow
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms
diff -u -r1.1 -r1.2 --- JMSFlow.java 24 Aug 2005 10:58:55 -0000 1.1 +++ JMSFlow.java 24 Aug 2005 14:07:24 -0000 1.2 @@ -54,11 +54,12 @@
import org.servicemix.jbi.nmr.flow.seda.SedaQueue; import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
/**
- * Use for message routing amonst a cluster of containers. All routing/cluster registration happens automatically
+ * Use for message routing among a network containers. All routing/registration happens automatically
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
private static final Log log = LogFactory.getLog(JMSFlow.class);
@@ -75,8 +76,9 @@
private Session broadcastSession;
private Session inboundSession;
private ConsumerAdvisor advisor;
- private Map clusterNodeKeyMap = new ConcurrentHashMap(); - private Map clusterComponentKeyMap = new ConcurrentHashMap();
+ private Map networkNodeKeyMap = new ConcurrentHashMap(); + private Map networkComponentKeyMap = new ConcurrentHashMap(); + private SynchronizedBoolean started = new SynchronizedBoolean(false);
/**
* The type of Flow
@@ -84,7 +86,7 @@
* @return the type
*/
public String getDescription() {
- return "cluster";
+ return "jms";
}
/**
@@ -130,20 +132,6 @@
}
/**
- * @return Returns the connection.
- */
- public Connection getConnection() {
- return connection;
- }
-
- /**
- * @param connection The connection to set.
- */
- public void setConnection(Connection connection) {
- this.connection = connection;
- }
-
- /**
* @return Returns the connectionFactory.
*/
public ConnectionFactory getConnectionFactory() {
@@ -180,21 +168,19 @@
public void init(Broker broker) throws JBIException {
super.init(broker);
try {
- if (connection == null) {
- if (connectionFactory == null) {
- if (jmsURL != null) {
- connectionFactory = new ActiveMQConnectionFactory(jmsURL);
- }
- else {
- connectionFactory = new ActiveMQConnectionFactory();
- }
- if (userName != null) {
- connection = connectionFactory.createConnection(userName, password);
- }
- else {
- connection = connectionFactory.createConnection();
- }
+ if (connectionFactory == null) {
+ if (jmsURL != null) {
+ connectionFactory = new ActiveMQConnectionFactory(jmsURL);
}
+ else {
+ connectionFactory = new ActiveMQConnectionFactory();
+ }
+ }
+ if (userName != null) {
+ connection = connectionFactory.createConnection(userName, password);
+ }
+ else {
+ connection = connectionFactory.createConnection();
}
connection.setClientID(broker.getContainerName());
connection.start();
@@ -223,13 +209,15 @@
* @throws JBIException
*/
public void start() throws JBIException {
- super.start();
- try {
- advisor.start();
- }
- catch (JMSException e) {
- JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
- throw jbiEx;
+ if (started.commit(false, true)) {
+ super.start();
+ try {
+ advisor.start();
+ }
+ catch (JMSException e) {
+ JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
+ throw jbiEx;
+ }
}
}
@@ -239,18 +227,21 @@
* @throws JBIException
*/
public void stop() throws JBIException {
- super.stop();
- try {
- advisor.stop();
- }
- catch (JMSException e) {
- JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
- throw jbiEx;
+ if (started.commit(true, false)) {
+ super.stop();
+ try {
+ advisor.stop();
+ }
+ catch (JMSException e) {
+ JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
+ throw jbiEx;
+ }
}
}
public void shutDown() throws JBIException {
super.shutDown();
+ stop();
if (this.connection != null) {
try {
this.connection.close();
@@ -330,17 +321,19 @@
* @param event
*/
public void onEvent(ConsumerAdvisoryEvent event) {
- ConsumerInfo info = event.getInfo();
- if (info.isStarted()) {
- for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
- LocalComponentConnector lcc = (LocalComponentConnector) i.next();
- ComponentPacket packet = lcc.getPacket();
- ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
- onEvent(cpe);
+ if (started.get()) {
+ ConsumerInfo info = event.getInfo();
+ if (info.isStarted()) {
+ for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
+ LocalComponentConnector lcc = (LocalComponentConnector) i.next();
+ ComponentPacket packet = lcc.getPacket();
+ ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
+ onEvent(cpe);
+ }
+ }
+ else {
+ removeAllPackets(info.getClientId());
}
- }
- else {
- removeAllPackets(info.getClientId());
}
}
@@ -433,11 +426,11 @@
}
private void addRemotePacket(String containerName, ComponentPacket packet) {
- clusterComponentKeyMap.put(packet.getComponentNameSpace(), containerName); - Set set = (Set) clusterNodeKeyMap.get(containerName);
+ networkComponentKeyMap.put(packet.getComponentNameSpace(), containerName); + Set set = (Set) networkNodeKeyMap.get(containerName);
if (set == null) {
set = new CopyOnWriteArraySet();
- clusterNodeKeyMap.put(containerName, set);
+ networkNodeKeyMap.put(containerName, set);
ComponentConnector cc = new ComponentConnector(packet);
log.info("Adding Remote Component: " + cc);
broker.getRegistry().addRemoteComponentConnector(cc);
@@ -446,7 +439,7 @@
}
private void updateRemotePacket(String containerName, ComponentPacket packet) {
- Set set = (Set) clusterNodeKeyMap.get(containerName);
+ Set set = (Set) networkNodeKeyMap.get(containerName);
if (set != null) {
set.remove(packet);
set.add(packet);
@@ -457,27 +450,27 @@
}
private void removeRemotePacket(String containerName, ComponentPacket packet) {
- clusterComponentKeyMap.remove(packet.getComponentNameSpace()); - Set set = (Set) clusterNodeKeyMap.get(containerName);
+ networkComponentKeyMap.remove(packet.getComponentNameSpace()); + Set set = (Set) networkNodeKeyMap.get(containerName);
if (set != null) {
set.remove(packet);
ComponentConnector cc = new ComponentConnector(packet);
log.info("Removing remote Component: " + cc);
broker.getRegistry().removeRemoteComponentConnector(cc);
if (set.isEmpty()) {
- clusterNodeKeyMap.remove(containerName);
+ networkNodeKeyMap.remove(containerName);
}
}
}
private void removeAllPackets(String containerName) {
- Set set = (Set) clusterNodeKeyMap.remove(containerName);
+ Set set = (Set) networkNodeKeyMap.remove(containerName);
for (Iterator i = set.iterator();i.hasNext();) {
ComponentPacket packet = (ComponentPacket) i.next();
ComponentConnector cc = new ComponentConnector(packet);
log.info("Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
broker.getRegistry().removeRemoteComponentConnector(cc);
- clusterComponentKeyMap.remove(packet.getComponentNameSpace());
+ networkComponentKeyMap.remove(packet.getComponentNameSpace());
}
}
}
servicemix/base/src/test/java/org/servicemix/jbi/nmr/flow/jms
JMSFlowTest.java added at 1.1
diff -N JMSFlowTest.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ JMSFlowTest.java 24 Aug 2005 14:07:24 -0000 1.1 @@ -0,0 +1,81 @@
+/**
+ * <a href="">ServiceMix: The open source ESB</a>
+ *
+ * Copyright 2005 RAJD Consultancy Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.nmr.flow.jms;
+
+import java.io.File;
+import junit.framework.TestCase;
+import org.servicemix.examples.*;
+import org.servicemix.jbi.container.ActivationSpec;
+import org.servicemix.jbi.container.JBIContainer;
+import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
+import org.servicemix.jbi.util.FileUtil;
+
+/**
+ *
+ * JMSFlowTest
+ */
+public class JMSFlowTest extends TestCase {
+ JBIContainer senderContainer = new JBIContainer();
+ JBIContainer receiverContainer = new JBIContainer();
+ private SenderComponent sender;
+ private ReceiverComponent receiver;
+ private static final int NUM_MESSAGES = 10;
+
+ /*
+ * @see TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ senderContainer.setName("senderContainer");
+ senderContainer.setFlowName("jms");
+ senderContainer.init();
+ senderContainer.start();
+ JMSFlow senderFlow = (JMSFlow) senderContainer.getFlow();
+
+
+ receiverContainer.setName("receiverContainer");
+ receiverContainer.setFlowName("jms");
+ receiverContainer.init();
+ receiverContainer.start();
+ JMSFlow receiverFlow = (JMSFlow) senderContainer.getFlow();
+
+ receiver = new ReceiverComponent();
+ sender = new SenderComponent();
+ sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
+
+ senderContainer.activateComponent(new ActivationSpec("sender", sender));
+ receiverContainer.activateComponent(new ActivationSpec("receiver", receiver));
+
+
+ Thread.sleep(5000);
+ }
+
+ protected void tearDown() throws Exception{
+ super.tearDown();
+ senderContainer.shutDown();
+ receiverContainer.shutDown();
+ }
+
+ public void testInOnly() throws Exception {
+ sender.sendMesssages(NUM_MESSAGES);
+ Thread.sleep(3000);
+ receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
+ }
+}
