Commit in servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms on MAIN
JMSFlow.java+483added 1.1
First cut of JMSFLow

servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms
JMSFlow.java added at 1.1
diff -N JMSFlow.java
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ JMSFlow.java	24 Aug 2005 10:58:55 -0000	1.1
@@ -0,0 +1,483 @@
+/** 
+ * <a href="">ServiceMix: The open source ESB</a> 
+ * 
+ * Copyright 2005 RAJD Consultancy Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
+package org.servicemix.jbi.nmr.flow.jms;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import javax.jbi.JBIException;
+import javax.jbi.messaging.MessagingException;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.resource.spi.work.WorkException;
+import org.activemq.ActiveMQConnection;
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.advisories.ConsumerAdvisor;
+import org.activemq.advisories.ConsumerAdvisoryEvent;
+import org.activemq.advisories.ConsumerAdvisoryEventListener;
+import org.activemq.message.ConsumerInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.servicemix.jbi.framework.ComponentConnector;
+import org.servicemix.jbi.framework.ComponentNameSpace;
+import org.servicemix.jbi.framework.ComponentPacket;
+import org.servicemix.jbi.framework.ComponentPacketEvent;
+import org.servicemix.jbi.framework.LocalComponentConnector;
+import org.servicemix.jbi.messaging.ExchangePacket;
+import org.servicemix.jbi.nmr.Broker;
+import org.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.servicemix.jbi.nmr.flow.seda.SedaQueue;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * Use for message routing amonst a cluster of containers. All routing/cluster registration happens automatically
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
+    private static final Log log = LogFactory.getLog(JMSFlow.class);
+    private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
+    private String jmsURL = ActiveMQConnection.DEFAULT_URL;
+    private String userName;
+    private String password;
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private String broadcastDestinationName = "org.servicemix.JMSFlow";
+    private MessageProducer queueProducer;
+    private MessageProducer topicProducer;
+    private Topic broadcastTopic;
+    private Session broadcastSession;
+    private Session inboundSession;
+    private ConsumerAdvisor advisor;
+    private Map clusterNodeKeyMap = new ConcurrentHashMap();
+    private Map clusterComponentKeyMap = new ConcurrentHashMap();
+
+    /**
+     * The type of Flow
+     * 
+     * @return the type
+     */
+    public String getDescription() {
+        return "cluster";
+    }
+
+    /**
+     * @return Returns the jmsURL.
+     */
+    public String getJmsURL() {
+        return jmsURL;
+    }
+
+    /**
+     * @param jmsURL The jmsURL to set.
+     */
+    public void setJmsURL(String jmsURL) {
+        this.jmsURL = jmsURL;
+    }
+
+    /**
+     * @return Returns the password.
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * @param password The password to set.
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /**
+     * @return Returns the userName.
+     */
+    public String getUserName() {
+        return userName;
+    }
+
+    /**
+     * @param userName The userName to set.
+     */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /**
+     * @return Returns the connection.
+     */
+    public Connection getConnection() {
+        return connection;
+    }
+
+    /**
+     * @param connection The connection to set.
+     */
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    /**
+     * @return Returns the connectionFactory.
+     */
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    /**
+     * @param connectionFactory The connectionFactory to set.
+     */
+    public void setConnectionFactory(ConnectionFactory connectoFactory) {
+        this.connectionFactory = connectoFactory;
+    }
+
+    /**
+     * @return Returns the broadcastDestinationName.
+     */
+    public String getBroadcastDestinationName() {
+        return broadcastDestinationName;
+    }
+
+    /**
+     * @param broadcastDestinationName The broadcastDestinationName to set.
+     */
+    public void setBroadcastDestinationName(String broadcastDestinationName) {
+        this.broadcastDestinationName = broadcastDestinationName;
+    }
+
+    /**
+     * Initialize the Region
+     * 
+     * @param broker
+     * @throws JBIException
+     */
+    public void init(Broker broker) throws JBIException {
+        super.init(broker);
+        try {
+            if (connection == null) {
+                if (connectionFactory == null) {
+                    if (jmsURL != null) {
+                        connectionFactory = new ActiveMQConnectionFactory(jmsURL);
+                    }
+                    else {
+                        connectionFactory = new ActiveMQConnectionFactory();
+                    }
+                    if (userName != null) {
+                        connection = connectionFactory.createConnection(userName, password);
+                    }
+                    else {
+                        connection = connectionFactory.createConnection();
+                    }
+                }
+            }
+            connection.setClientID(broker.getContainerName());
+            connection.start();
+            inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainerName());
+            MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
+            inboundQueue.setMessageListener(this);
+            queueProducer = inboundSession.createProducer(null);
+            broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
+            MessageConsumer broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
+            broadcastConsumer.setMessageListener(this);
+            topicProducer = broadcastSession.createProducer(broadcastTopic);
+            advisor = new ConsumerAdvisor(connection, broadcastTopic);
+            advisor.addListener(this);
+        }
+        catch (JMSException e) {
+            log.error("Failed t0 initialize JMSFlow", e);
+            throw new JBIException(e);
+        }
+    }
+
+    /**
+     * start the flow
+     * 
+     * @throws JBIException
+     */
+    public void start() throws JBIException {
+        super.start();
+        try {
+            advisor.start();
+        }
+        catch (JMSException e) {
+            JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
+            throw jbiEx;
+        }
+    }
+
+    /**
+     * stop the flow
+     * 
+     * @throws JBIException
+     */
+    public void stop() throws JBIException {
+        super.stop();
+        try {
+            advisor.stop();
+        }
+        catch (JMSException e) {
+            JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
+            throw jbiEx;
+        }
+    }
+
+    public void shutDown() throws JBIException {
+        super.shutDown();
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            }
+            catch (JMSException e) {
+                log.warn("erro closing JMS Connection", e);
+            }
+        }
+    }
+
+    /**
+     * useful for testing
+     * 
+     * @return number of containers in the network
+     */
+    public int numberInNetwork() {
+        return advisor.activeConsumers(broadcastTopic).size();
+    }
+
+    /**
+     * Distribute an ExchangePacket
+     * 
+     * @param packet
+     * @throws JBIException
+     */
+    protected synchronized void doSend(ExchangePacket packet) throws JBIException {
+        ComponentNameSpace cns = packet.getDestinationId();
+        SedaQueue queue = (SedaQueue) queueMap.get(cns);
+        if (queue == null) {
+            queue = new SedaQueue(cns);
+            queueMap.put(cns, queue);
+            queue.init(this, capacity);
+            registerQueue(cns, queue);
+            if (started.get()) {
+                queue.start();
+                try {
+                    broker.getWorkManager().startWork(queue);
+                }
+                catch (WorkException e) {
+                    log.error("Failed to create start SedaQueue", e);
+                    queue.shutDown();
+                    queueMap.remove(cns);
+                    unregisterQueue(queue);
+                    throw new MessagingException(queue + " Failed in creation", e);
+                }
+            }
+        }
+        try {
+            queue.enqueue(packet);
+        }
+        catch (InterruptedException e) {
+            throw new MessagingException(queue + " Failed to enqueue packet: " + packet, e);
+        }
+    }
+
+    /**
+     * Process state changes in Components
+     * 
+     * @param event
+     */
+    public void onEvent(ComponentPacketEvent event) {
+        super.onEvent(event);
+        try {
+            // broadcast change to the network
+            ObjectMessage msg = broadcastSession.createObjectMessage(event);
+            topicProducer.send(msg);
+            log.info("broadcast to internal JMS network: " + event);
+        }
+        catch (JMSException e) {
+            log.error("failed to broadcast to the internal JMS network: " + event, e);
+        }
+    }
+
+    /**
+     * ConsumerAdvisoerEventListener implementation
+     * 
+     * @param event
+     */
+    public void onEvent(ConsumerAdvisoryEvent event) {
+        ConsumerInfo info = event.getInfo();
+        if (info.isStarted()) {
+            for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
+                LocalComponentConnector lcc = (LocalComponentConnector) i.next();
+                ComponentPacket packet = lcc.getPacket();
+                ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
+                onEvent(cpe);
+            }
+        }
+        else {
+            removeAllPackets(info.getClientId());
+        }
+    }
+
+    /**
+     * Distribute an ExchangePacket
+     * 
+     * @param packet
+     * @throws MessagingException
+     */
+    public void doRouting(ExchangePacket packet) throws MessagingException {
+        ComponentNameSpace id = packet.isOutbound() ? packet.getDestinationId() : packet.getSourceId();
+        ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
+        if (cc != null) {
+            if (cc.isLocal()) {
+                super.doRouting(packet);
+            }
+            else {
+                try {
+                    String containerName = cc.getComponentNameSpace().getContainerName();
+                    Queue queue = inboundSession.createQueue(INBOUND_PREFIX + containerName);
+                    ObjectMessage msg = inboundSession.createObjectMessage(packet);
+                    queueProducer.send(queue, msg);
+                }
+                catch (JMSException e) {
+                    log.error("Failed to send packet: " + packet + " internal JMS Network", e);
+                    throw new MessagingException(e);
+                }
+            }
+        }
+        else {
+            throw new MessagingException("No component with id (" + id + ") - Couldn't route ExchangePacket " + packet);
+        }
+    }
+
+    /**
+     * MessageListener implementation
+     */
+    /**
+     * @param message
+     */
+    public void onMessage(Message message) {
+        try {
+            if (message != null && message instanceof ObjectMessage) {
+                ObjectMessage objMsg = (ObjectMessage) message;
+                Object obj = objMsg.getObject();
+                if (obj != null) {
+                    if (obj instanceof ComponentPacketEvent) {
+                        ComponentPacketEvent event = (ComponentPacketEvent) obj;
+                        String containerName = event.getPacket().getComponentNameSpace().getContainerName();
+                        processInBoundPacket(containerName, event);
+                    }
+                    else if (obj instanceof ExchangePacket) {
+                        ExchangePacket packet = (ExchangePacket) obj;
+                        super.doRouting(packet);
+                    }
+                }
+            }
+        }
+        catch (JMSException jmsEx) {
+            log.error("Caught an exception unpacking JMS Message: ", jmsEx);
+        }
+        catch (MessagingException e) {
+            log.error("Caught an exception routing ExchangePacket: ", e);
+        }
+    }
+
+    /**
+     * Process Inbound packets
+     * 
+     * @param containerName
+     * @param event
+     */
+    protected void processInBoundPacket(String containerName, ComponentPacketEvent event) {
+        ComponentPacket packet = event.getPacket();
+        if (!packet.getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
+            if (event.getStatus() == ComponentPacketEvent.ACTIVATED) {
+                addRemotePacket(containerName, packet);
+            }
+            else if (event.getStatus() == ComponentPacketEvent.DEACTIVATED) {
+                removeRemotePacket(containerName, packet);
+            }
+            else if (event.getStatus() == ComponentPacketEvent.STATE_CHANGE) {
+                updateRemotePacket(containerName, packet);
+            }
+            else {
+                log.warn("Unable to determine ComponentPacketEvent type: " + event.getStatus() + " for packet: "
+                        + packet);
+            }
+        }
+    }
+
+    private void addRemotePacket(String containerName, ComponentPacket packet) {
+        clusterComponentKeyMap.put(packet.getComponentNameSpace(), containerName);
+        Set set = (Set) clusterNodeKeyMap.get(containerName);
+        if (set == null) {
+            set = new CopyOnWriteArraySet();
+            clusterNodeKeyMap.put(containerName, set);
+            ComponentConnector cc = new ComponentConnector(packet);
+            log.info("Adding Remote Component: " + cc);
+            broker.getRegistry().addRemoteComponentConnector(cc);
+        }
+        set.add(packet);
+    }
+
+    private void updateRemotePacket(String containerName, ComponentPacket packet) {
+        Set set = (Set) clusterNodeKeyMap.get(containerName);
+        if (set != null) {
+            set.remove(packet);
+            set.add(packet);
+        }
+        ComponentConnector cc = new ComponentConnector(packet);
+        log.info("Updating remote Component: " + cc);
+        broker.getRegistry().updateRemoteComponentConnector(cc);
+    }
+
+    private void removeRemotePacket(String containerName, ComponentPacket packet) {
+        clusterComponentKeyMap.remove(packet.getComponentNameSpace());
+        Set set = (Set) clusterNodeKeyMap.get(containerName);
+        if (set != null) {
+            set.remove(packet);
+            ComponentConnector cc = new ComponentConnector(packet);
+            log.info("Removing remote Component: " + cc);
+            broker.getRegistry().removeRemoteComponentConnector(cc);
+            if (set.isEmpty()) {
+                clusterNodeKeyMap.remove(containerName);
+            }
+        }
+    }
+
+    private void removeAllPackets(String containerName) {
+        Set set = (Set) clusterNodeKeyMap.remove(containerName);
+        for (Iterator i = set.iterator();i.hasNext();) {
+            ComponentPacket packet = (ComponentPacket) i.next();
+            ComponentConnector cc = new ComponentConnector(packet);
+            log.info("Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
+            broker.getRegistry().removeRemoteComponentConnector(cc);
+            clusterComponentKeyMap.remove(packet.getComponentNameSpace());
+        }
+    }
+}
\ No newline at end of file
CVSspam 0.2.8



Reply via email to