| Commit in servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms on MAIN | |||
| JMSFlow.java | +483 | added 1.1 | |
First cut of JMSFLow
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms
JMSFlow.java added at 1.1
diff -N JMSFlow.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ JMSFlow.java 24 Aug 2005 10:58:55 -0000 1.1 @@ -0,0 +1,483 @@
+/**
+ * <a href="">ServiceMix: The open source ESB</a>
+ *
+ * Copyright 2005 RAJD Consultancy Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.servicemix.jbi.nmr.flow.jms;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import javax.jbi.JBIException;
+import javax.jbi.messaging.MessagingException;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.resource.spi.work.WorkException;
+import org.activemq.ActiveMQConnection;
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.advisories.ConsumerAdvisor;
+import org.activemq.advisories.ConsumerAdvisoryEvent;
+import org.activemq.advisories.ConsumerAdvisoryEventListener;
+import org.activemq.message.ConsumerInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.servicemix.jbi.framework.ComponentConnector;
+import org.servicemix.jbi.framework.ComponentNameSpace;
+import org.servicemix.jbi.framework.ComponentPacket;
+import org.servicemix.jbi.framework.ComponentPacketEvent;
+import org.servicemix.jbi.framework.LocalComponentConnector;
+import org.servicemix.jbi.messaging.ExchangePacket;
+import org.servicemix.jbi.nmr.Broker;
+import org.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.servicemix.jbi.nmr.flow.seda.SedaQueue;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * Use for message routing amonst a cluster of containers. All routing/cluster registration happens automatically
+ *
+ * @version $Revision: 1.1 $
+ */
+public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
+ private static final Log log = LogFactory.getLog(JMSFlow.class);
+ private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
+ private String jmsURL = ActiveMQConnection.DEFAULT_URL;
+ private String userName;
+ private String password;
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private String broadcastDestinationName = "org.servicemix.JMSFlow";
+ private MessageProducer queueProducer;
+ private MessageProducer topicProducer;
+ private Topic broadcastTopic;
+ private Session broadcastSession;
+ private Session inboundSession;
+ private ConsumerAdvisor advisor;
+ private Map clusterNodeKeyMap = new ConcurrentHashMap();
+ private Map clusterComponentKeyMap = new ConcurrentHashMap();
+
+ /**
+ * The type of Flow
+ *
+ * @return the type
+ */
+ public String getDescription() {
+ return "cluster";
+ }
+
+ /**
+ * @return Returns the jmsURL.
+ */
+ public String getJmsURL() {
+ return jmsURL;
+ }
+
+ /**
+ * @param jmsURL The jmsURL to set.
+ */
+ public void setJmsURL(String jmsURL) {
+ this.jmsURL = jmsURL;
+ }
+
+ /**
+ * @return Returns the password.
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * @param password The password to set.
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * @return Returns the userName.
+ */
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * @param userName The userName to set.
+ */
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ /**
+ * @return Returns the connection.
+ */
+ public Connection getConnection() {
+ return connection;
+ }
+
+ /**
+ * @param connection The connection to set.
+ */
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+ /**
+ * @return Returns the connectionFactory.
+ */
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ /**
+ * @param connectionFactory The connectionFactory to set.
+ */
+ public void setConnectionFactory(ConnectionFactory connectoFactory) {
+ this.connectionFactory = connectoFactory;
+ }
+
+ /**
+ * @return Returns the broadcastDestinationName.
+ */
+ public String getBroadcastDestinationName() {
+ return broadcastDestinationName;
+ }
+
+ /**
+ * @param broadcastDestinationName The broadcastDestinationName to set.
+ */
+ public void setBroadcastDestinationName(String broadcastDestinationName) {
+ this.broadcastDestinationName = broadcastDestinationName;
+ }
+
+ /**
+ * Initialize the Region
+ *
+ * @param broker
+ * @throws JBIException
+ */
+ public void init(Broker broker) throws JBIException {
+ super.init(broker);
+ try {
+ if (connection == null) {
+ if (connectionFactory == null) {
+ if (jmsURL != null) {
+ connectionFactory = new ActiveMQConnectionFactory(jmsURL);
+ }
+ else {
+ connectionFactory = new ActiveMQConnectionFactory();
+ }
+ if (userName != null) {
+ connection = connectionFactory.createConnection(userName, password);
+ }
+ else {
+ connection = connectionFactory.createConnection();
+ }
+ }
+ }
+ connection.setClientID(broker.getContainerName());
+ connection.start();
+ inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainerName());
+ MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
+ inboundQueue.setMessageListener(this);
+ queueProducer = inboundSession.createProducer(null);
+ broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
+ MessageConsumer broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
+ broadcastConsumer.setMessageListener(this);
+ topicProducer = broadcastSession.createProducer(broadcastTopic);
+ advisor = new ConsumerAdvisor(connection, broadcastTopic);
+ advisor.addListener(this);
+ }
+ catch (JMSException e) {
+ log.error("Failed t0 initialize JMSFlow", e);
+ throw new JBIException(e);
+ }
+ }
+
+ /**
+ * start the flow
+ *
+ * @throws JBIException
+ */
+ public void start() throws JBIException {
+ super.start();
+ try {
+ advisor.start();
+ }
+ catch (JMSException e) {
+ JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
+ throw jbiEx;
+ }
+ }
+
+ /**
+ * stop the flow
+ *
+ * @throws JBIException
+ */
+ public void stop() throws JBIException {
+ super.stop();
+ try {
+ advisor.stop();
+ }
+ catch (JMSException e) {
+ JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
+ throw jbiEx;
+ }
+ }
+
+ public void shutDown() throws JBIException {
+ super.shutDown();
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ }
+ catch (JMSException e) {
+ log.warn("erro closing JMS Connection", e);
+ }
+ }
+ }
+
+ /**
+ * useful for testing
+ *
+ * @return number of containers in the network
+ */
+ public int numberInNetwork() {
+ return advisor.activeConsumers(broadcastTopic).size();
+ }
+
+ /**
+ * Distribute an ExchangePacket
+ *
+ * @param packet
+ * @throws JBIException
+ */
+ protected synchronized void doSend(ExchangePacket packet) throws JBIException {
+ ComponentNameSpace cns = packet.getDestinationId();
+ SedaQueue queue = (SedaQueue) queueMap.get(cns);
+ if (queue == null) {
+ queue = new SedaQueue(cns);
+ queueMap.put(cns, queue);
+ queue.init(this, capacity);
+ registerQueue(cns, queue);
+ if (started.get()) {
+ queue.start();
+ try {
+ broker.getWorkManager().startWork(queue);
+ }
+ catch (WorkException e) {
+ log.error("Failed to create start SedaQueue", e);
+ queue.shutDown();
+ queueMap.remove(cns);
+ unregisterQueue(queue);
+ throw new MessagingException(queue + " Failed in creation", e);
+ }
+ }
+ }
+ try {
+ queue.enqueue(packet);
+ }
+ catch (InterruptedException e) {
+ throw new MessagingException(queue + " Failed to enqueue packet: " + packet, e);
+ }
+ }
+
+ /**
+ * Process state changes in Components
+ *
+ * @param event
+ */
+ public void onEvent(ComponentPacketEvent event) {
+ super.onEvent(event);
+ try {
+ // broadcast change to the network
+ ObjectMessage msg = broadcastSession.createObjectMessage(event);
+ topicProducer.send(msg);
+ log.info("broadcast to internal JMS network: " + event);
+ }
+ catch (JMSException e) {
+ log.error("failed to broadcast to the internal JMS network: " + event, e);
+ }
+ }
+
+ /**
+ * ConsumerAdvisoerEventListener implementation
+ *
+ * @param event
+ */
+ public void onEvent(ConsumerAdvisoryEvent event) {
+ ConsumerInfo info = event.getInfo();
+ if (info.isStarted()) {
+ for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
+ LocalComponentConnector lcc = (LocalComponentConnector) i.next();
+ ComponentPacket packet = lcc.getPacket();
+ ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
+ onEvent(cpe);
+ }
+ }
+ else {
+ removeAllPackets(info.getClientId());
+ }
+ }
+
+ /**
+ * Distribute an ExchangePacket
+ *
+ * @param packet
+ * @throws MessagingException
+ */
+ public void doRouting(ExchangePacket packet) throws MessagingException {
+ ComponentNameSpace id = packet.isOutbound() ? packet.getDestinationId() : packet.getSourceId();
+ ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
+ if (cc != null) {
+ if (cc.isLocal()) {
+ super.doRouting(packet);
+ }
+ else {
+ try {
+ String containerName = cc.getComponentNameSpace().getContainerName();
+ Queue queue = inboundSession.createQueue(INBOUND_PREFIX + containerName);
+ ObjectMessage msg = inboundSession.createObjectMessage(packet);
+ queueProducer.send(queue, msg);
+ }
+ catch (JMSException e) {
+ log.error("Failed to send packet: " + packet + " internal JMS Network", e);
+ throw new MessagingException(e);
+ }
+ }
+ }
+ else {
+ throw new MessagingException("No component with id (" + id + ") - Couldn't route ExchangePacket " + packet);
+ }
+ }
+
+ /**
+ * MessageListener implementation
+ */
+ /**
+ * @param message
+ */
+ public void onMessage(Message message) {
+ try {
+ if (message != null && message instanceof ObjectMessage) {
+ ObjectMessage objMsg = (ObjectMessage) message;
+ Object obj = objMsg.getObject();
+ if (obj != null) {
+ if (obj instanceof ComponentPacketEvent) {
+ ComponentPacketEvent event = (ComponentPacketEvent) obj;
+ String containerName = event.getPacket().getComponentNameSpace().getContainerName();
+ processInBoundPacket(containerName, event);
+ }
+ else if (obj instanceof ExchangePacket) {
+ ExchangePacket packet = (ExchangePacket) obj;
+ super.doRouting(packet);
+ }
+ }
+ }
+ }
+ catch (JMSException jmsEx) {
+ log.error("Caught an exception unpacking JMS Message: ", jmsEx);
+ }
+ catch (MessagingException e) {
+ log.error("Caught an exception routing ExchangePacket: ", e);
+ }
+ }
+
+ /**
+ * Process Inbound packets
+ *
+ * @param containerName
+ * @param event
+ */
+ protected void processInBoundPacket(String containerName, ComponentPacketEvent event) {
+ ComponentPacket packet = event.getPacket();
+ if (!packet.getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
+ if (event.getStatus() == ComponentPacketEvent.ACTIVATED) {
+ addRemotePacket(containerName, packet);
+ }
+ else if (event.getStatus() == ComponentPacketEvent.DEACTIVATED) {
+ removeRemotePacket(containerName, packet);
+ }
+ else if (event.getStatus() == ComponentPacketEvent.STATE_CHANGE) {
+ updateRemotePacket(containerName, packet);
+ }
+ else {
+ log.warn("Unable to determine ComponentPacketEvent type: " + event.getStatus() + " for packet: "
+ + packet);
+ }
+ }
+ }
+
+ private void addRemotePacket(String containerName, ComponentPacket packet) {
+ clusterComponentKeyMap.put(packet.getComponentNameSpace(), containerName);
+ Set set = (Set) clusterNodeKeyMap.get(containerName);
+ if (set == null) {
+ set = new CopyOnWriteArraySet();
+ clusterNodeKeyMap.put(containerName, set);
+ ComponentConnector cc = new ComponentConnector(packet);
+ log.info("Adding Remote Component: " + cc);
+ broker.getRegistry().addRemoteComponentConnector(cc);
+ }
+ set.add(packet);
+ }
+
+ private void updateRemotePacket(String containerName, ComponentPacket packet) {
+ Set set = (Set) clusterNodeKeyMap.get(containerName);
+ if (set != null) {
+ set.remove(packet);
+ set.add(packet);
+ }
+ ComponentConnector cc = new ComponentConnector(packet);
+ log.info("Updating remote Component: " + cc);
+ broker.getRegistry().updateRemoteComponentConnector(cc);
+ }
+
+ private void removeRemotePacket(String containerName, ComponentPacket packet) {
+ clusterComponentKeyMap.remove(packet.getComponentNameSpace());
+ Set set = (Set) clusterNodeKeyMap.get(containerName);
+ if (set != null) {
+ set.remove(packet);
+ ComponentConnector cc = new ComponentConnector(packet);
+ log.info("Removing remote Component: " + cc);
+ broker.getRegistry().removeRemoteComponentConnector(cc);
+ if (set.isEmpty()) {
+ clusterNodeKeyMap.remove(containerName);
+ }
+ }
+ }
+
+ private void removeAllPackets(String containerName) {
+ Set set = (Set) clusterNodeKeyMap.remove(containerName);
+ for (Iterator i = set.iterator();i.hasNext();) {
+ ComponentPacket packet = (ComponentPacket) i.next();
+ ComponentConnector cc = new ComponentConnector(packet);
+ log.info("Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
+ broker.getRegistry().removeRemoteComponentConnector(cc);
+ clusterComponentKeyMap.remove(packet.getComponentNameSpace());
+ }
+ }
+}
\ No newline at end of file
