Author: keith
Date: Sun Aug  3 02:41:19 2008
New Revision: 20314
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=20314

Log:
Adding a patch to address https://wso2.org/jira/browse/MASHUP-1048. For now 
simple cloned the HashMap prior to iterating



Added:
   
branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/
   
branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/jms/
   
branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/jms/JMSListener.java
Modified:
   
branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt

Modified: 
branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt
URL: 
http://wso2.org/svn/browse/wso2/branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt?rev=20314&r1=20313&r2=20314&view=diff
==============================================================================
--- 
branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt 
    (original)
+++ 
branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt 
    Sun Aug  3 02:41:19 2008
@@ -4,3 +4,4 @@
 https://issues.apache.org/jira/browse/AXIS2-3877
 https://issues.apache.org/jira/browse/AXIS2-3897
 https://issues.apache.org/jira/browse/AXIS2-3902
+https://wso2.org/jira/browse/MASHUP-1048

Added: 
branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/jms/JMSListener.java
URL: 
http://wso2.org/svn/browse/wso2/branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/jms/JMSListener.java?pathrev=20314
==============================================================================
--- (empty file)
+++ 
branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/jms/JMSListener.java
     Sun Aug  3 02:41:19 2008
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.SessionContext;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterIncludeImpl;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.engine.AxisEvent;
+import org.apache.axis2.engine.AxisObserver;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+/**
+ * The JMS Transport listener implementation. A JMS Listner will hold one or
+ * more JMS connection factories, which would be created at initialization
+ * time. This implementation does not support the creation of connection
+ * factories at runtime. This JMS Listener registers with Axis to be notified
+ * of service deployment/undeployment/start and stop, and enables or disables
+ * listening for messages on the destinations as appropriate.
+ * <p/>
+ * A Service could state the JMS connection factory name and the destination
+ * name for use as Parameters in its services.xml as shown in the example
+ * below. If the connection name was not specified, it will use the connection
+ * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a
+ * factory is defined in the Axis2.xml. If the destination name is not 
specified
+ * it will default to a JMS queue by the name of the service. If the 
destination
+ * should be a Topic, it should be created on the JMS implementation, and
+ * specified in the services.xml of the service.
+ * <p/>
+ * <parameter name="transport.jms.ConnectionFactory" locked="true">
+ * myTopicConnectionFactory</parameter>
+ * <parameter name="transport.jms.Destination" locked="true">
+ * dynamicTopics/something.TestTopic</parameter>
+ */
+public class JMSListener implements TransportListener {
+
+    private static final Log log = LogFactory.getLog(JMSListener.class);
+
+    /**
+     * The maximum number of threads used for the worker thread pool
+     */
+    private static final int WORKERS_MAX_THREADS = 100;
+    /**
+     * The keep alive time of an idle worker thread
+     */
+    private static final long WORKER_KEEP_ALIVE = 60L;
+    /**
+     * The worker thread timeout time unit
+     */
+    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+
+    /**
+     * A Map containing the connection factories managed by this, keyed by name
+     */
+    private Map connectionFactories = new HashMap();
+    /**
+     * A Map of service name to the JMS EPR addresses
+     */
+    private Map serviceNameToEprMap = new HashMap();
+    /**
+     * The Axis2 Configuration context
+     */
+    private ConfigurationContext configCtx = null;
+    
+    private ExecutorService workerPool;
+
+    /**
+     * This is the TransportListener initialization method invoked by Axis2
+     *
+     * @param axisConf   the Axis configuration context
+     * @param transprtIn the TransportIn description
+     */
+    public void init(ConfigurationContext axisConf,
+                     TransportInDescription transprtIn) {
+
+        // save reference to the configuration context
+        this.configCtx = axisConf;
+
+        // initialize the defined connection factories
+        initializeConnectionFactories(transprtIn);
+
+        // if no connection factories are defined, we cannot listen
+        if (connectionFactories.isEmpty()) {
+            log.warn("No JMS connection factories are defined." +
+                     "Will not listen for any JMS messages");
+            return;
+        }
+
+        // iterate through deployed services and validate connection factory
+        // names, and mark services as faulty where appropriate.
+        HashMap map = axisConf.getAxisConfiguration().getServices();
+        HashMap clonedMap = (HashMap)map.clone();
+        Iterator services =
+                clonedMap.values().iterator();
+
+        while (services.hasNext()) {
+            AxisService service = (AxisService) services.next();
+            if (JMSUtils.isJMSService(service)) {
+                processService(service);
+            }
+        }
+
+        // register to receive updates on services for lifetime management
+        axisConf.getAxisConfiguration().addObservers(new JMSAxisObserver());
+
+        log.info("JMS Transport Receiver (Listener) initialized...");
+    }
+
+
+    /**
+     * Prepare to listen for JMS messages on behalf of this service
+     *
+     * @param service
+     */
+    private void processService(AxisService service) {
+        JMSConnectionFactory cf = getConnectionFactory(service);
+        if (cf == null) {
+            String msg = "Service " + service.getName() + " does not specify" +
+                         "a JMS connection factory or refers to an invalid 
factory. " +
+                         "This service is being marked as faulty and will not 
be " +
+                         "available over the JMS transport";
+            log.warn(msg);
+            JMSUtils.markServiceAsFaulty(
+                    service.getName(), msg, service.getAxisConfiguration());
+            return;
+        }
+
+        String destination = JMSUtils.getDestination(service);
+
+        // compute service EPR and keep for later use
+        serviceNameToEprMap.put(service.getName(), getEPR(cf, destination));
+
+        // add the specified or implicit destination of this service
+        // to its connection factory
+        cf.addDestination(destination, service.getName());
+    }
+
+    /**
+     * Return the connection factory name for this service. If this service
+     * refers to an invalid factory or defaults to a non-existent default
+     * factory, this returns null
+     *
+     * @param service the AxisService
+     * @return the JMSConnectionFactory to be used, or null if reference is 
invalid
+     */
+    private JMSConnectionFactory getConnectionFactory(AxisService service) {
+        Parameter conFacParam = 
service.getParameter(JMSConstants.CONFAC_PARAM);
+
+        // validate connection factory name (specified or default)
+        if (conFacParam != null) {
+            String conFac = (String) conFacParam.getValue();
+            if (connectionFactories.containsKey(conFac)) {
+                return (JMSConnectionFactory) connectionFactories.get(conFac);
+            } else {
+                return null;
+            }
+
+        } else if 
(connectionFactories.containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) {
+            return (JMSConnectionFactory) connectionFactories.
+                    get(JMSConstants.DEFAULT_CONFAC_NAME);
+
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Initialize the defined connection factories, parsing the TransportIn
+     * descriptions
+     *
+     * @param transprtIn The Axis2 Transport in for the JMS
+     */
+    private void initializeConnectionFactories(TransportInDescription 
transprtIn) {
+        // iterate through all defined connection factories
+        Iterator conFacIter = transprtIn.getParameters().iterator();
+
+        while (conFacIter.hasNext()) {
+
+            Parameter param = (Parameter) conFacIter.next();
+            JMSConnectionFactory jmsConFactory =
+                    new JMSConnectionFactory(param.getName());
+
+            ParameterIncludeImpl pi = new ParameterIncludeImpl();
+            try {
+                pi.deserializeParameters((OMElement) param.getValue());
+            } catch (AxisFault axisFault) {
+                handleException("Error reading Parameters for JMS connection " 
+
+                                "factory" + jmsConFactory.getName(), 
axisFault);
+            }
+
+            // read connection facotry properties
+            Iterator params = pi.getParameters().iterator();
+
+            while (params.hasNext()) {
+                Parameter p = (Parameter) params.next();
+
+                if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) {
+                    jmsConFactory.addProperty(
+                            Context.INITIAL_CONTEXT_FACTORY, (String) 
p.getValue());
+                } else if (Context.PROVIDER_URL.equals(p.getName())) {
+                    jmsConFactory.addProperty(
+                            Context.PROVIDER_URL, (String) p.getValue());
+                } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) {
+                    jmsConFactory.addProperty(
+                            Context.SECURITY_PRINCIPAL, (String) p.getValue());
+                } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) {
+                    jmsConFactory.addProperty(
+                            Context.SECURITY_CREDENTIALS, (String) 
p.getValue());
+                } else if 
(JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) {
+                    jmsConFactory.setJndiName((String) p.getValue());
+                } else if 
(JMSConstants.CONFAC_JNDI_NAME_USER.equals(p.getName())) {
+                    jmsConFactory.setJndiUser((String) p.getValue());
+                } else if 
(JMSConstants.CONFAC_JNDI_NAME_PASS.equals(p.getName())) {
+                    jmsConFactory.setJndiPass((String) p.getValue());
+                } else if (JMSConstants.DEST_PARAM.equals(p.getName())) {
+                    StringTokenizer st =
+                            new StringTokenizer((String) p.getValue(), " ,");
+                    while (st.hasMoreTokens()) {
+                        jmsConFactory.addDestination(st.nextToken(), null);
+                    }
+                }
+            }
+
+            // connect to the actual connection factory
+            try {
+                jmsConFactory.connect();
+                connectionFactories.put(jmsConFactory.getName(), 
jmsConFactory);
+            } catch (NamingException e) {
+                handleException("Error connecting to JMS connection factory : 
" +
+                                jmsConFactory.getJndiName(), e);
+            }
+        }
+    }
+
+    /**
+     * Get the EPR for the given JMS connection factory and destination
+     * the form of the URL is
+     * jms:/<destination>?[<key>=<value>&]*
+     *
+     * @param cf          the Axis2 JMS connection factory
+     * @param destination the JNDI name of the destination
+     * @return the EPR as a String
+     */
+    private static String getEPR(JMSConnectionFactory cf, String destination) {
+        StringBuffer sb = new StringBuffer();
+        sb.append(JMSConstants.JMS_PREFIX).append(destination);
+        sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM).
+                append("=").append(cf.getJndiName());
+        Iterator props = cf.getProperties().keySet().iterator();
+        while (props.hasNext()) {
+            String key = (String) props.next();
+            String value = (String) cf.getProperties().get(key);
+            sb.append("&").append(key).append("=").append(value);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Start this JMS Listener (Transport Listener)
+     *
+     * @throws AxisFault
+     */
+    public void start() throws AxisFault {
+        // create thread pool of workers
+        workerPool = new ThreadPoolExecutor(
+                1,
+                WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT,
+                new LinkedBlockingQueue(),
+                new org.apache.axis2.util.threadpool.DefaultThreadFactory(
+                        new ThreadGroup("JMS Worker thread group"),
+                        "JMSWorker"));
+
+        Iterator iter = connectionFactories.values().iterator();
+        while (iter.hasNext()) {
+            JMSConnectionFactory conFac = (JMSConnectionFactory) iter.next();
+            JMSMessageReceiver msgRcvr =
+                    new JMSMessageReceiver(conFac, workerPool, configCtx);
+
+            try {
+                conFac.listen(msgRcvr);
+            } catch (JMSException e) {
+                handleException("Error starting connection factory : " +
+                                conFac.getName(), e);
+            }
+        }
+    }
+
+    /**
+     * Stop this transport listener and shutdown all of the connection 
factories
+     */
+    public void stop() {
+        Iterator iter = connectionFactories.values().iterator();
+        while (iter.hasNext()) {
+            ((JMSConnectionFactory) iter.next()).stop();
+        }
+        if (workerPool != null) {
+            workerPool.shutdown();
+        }
+    }
+
+    /**
+     * Returns EPRs for the given service and IP. (Picks up precomputed EPR)
+     *
+     * @param serviceName service name
+     * @param ip          ignored
+     * @return the EPR for the service
+     * @throws AxisFault not used
+     */
+    public EndpointReference[] getEPRsForService(String serviceName, String 
ip) throws AxisFault {
+        //Strip out the operation name
+        if (serviceName.indexOf('/') != -1) {
+            serviceName = serviceName.substring(0, serviceName.indexOf('/'));
+        }
+        return new EndpointReference[]{
+                new EndpointReference((String) 
serviceNameToEprMap.get(serviceName))};
+    }
+
+    /**
+     * Returns the EPR for the given service and IP. (Picks up precomputed EPR)
+     *
+     * @param serviceName service name
+     * @param ip          ignored
+     * @return the EPR for the service
+     * @throws AxisFault not used
+     */
+    public EndpointReference getEPRForService(String serviceName, String ip) 
throws AxisFault {
+        return getEPRsForService(serviceName, ip)[0];
+    }
+
+    /**
+     * Starts listening for messages on this service
+     *
+     * @param service the AxisService just deployed
+     */
+    private void startListeningForService(AxisService service) {
+        processService(service);
+        JMSConnectionFactory cf = getConnectionFactory(service);
+        if (cf == null) {
+            String msg = "Service " + service.getName() + " does not specify" +
+                         "a JMS connection factory or refers to an invalid 
factory." +
+                         "This service is being marked as faulty and will not 
be " +
+                         "available over the JMS transport";
+            log.warn(msg);
+            JMSUtils.markServiceAsFaulty(
+                    service.getName(), msg, service.getAxisConfiguration());
+            return;
+        }
+
+        String destination = JMSUtils.getDestination(service);
+        try {
+            cf.listenOnDestination(destination);
+            log.info("Started listening on destination : " + destination +
+                     " for service " + service.getName());
+
+        } catch (JMSException e) {
+            handleException(
+                    "Could not listen on JMS for service " + 
service.getName(), e);
+            JMSUtils.markServiceAsFaulty(
+                    service.getName(), e.getMessage(), 
service.getAxisConfiguration());
+        }
+    }
+
+    /**
+     * Stops listening for messages for the service undeployed
+     *
+     * @param service the AxisService just undeployed
+     */
+    private void stopListeningForService(AxisService service) {
+
+        JMSConnectionFactory cf = getConnectionFactory(service);
+        if (cf == null) {
+            String msg = "Service " + service.getName() + " does not specify" +
+                         "a JMS connection factory or refers to an invalid 
factory." +
+                         "This service is being marked as faulty and will not 
be " +
+                         "available over the JMS transport";
+            log.warn(msg);
+            JMSUtils.markServiceAsFaulty(
+                    service.getName(), msg, service.getAxisConfiguration());
+            return;
+        }
+
+        // remove from the serviceNameToEprMap
+        serviceNameToEprMap.remove(service.getName());
+
+        String destination = JMSUtils.getDestination(service);
+        try {
+            cf.removeDestination(destination);
+        } catch (JMSException e) {
+            handleException(
+                    "Error while terminating listening on JMS destination : " 
+ destination, e);
+        }
+    }
+
+    private void handleException(String msg, Exception e) {
+        log.error(msg, e);
+        throw new AxisJMSException(msg, e);
+    }
+
+    /**
+     * An AxisObserver which will start listening for newly deployed services,
+     * and stop listening when services are undeployed.
+     */
+    class JMSAxisObserver implements AxisObserver {
+
+        // The initilization code will go here
+        public void init(AxisConfiguration axisConfig) {
+        }
+
+        public void serviceUpdate(AxisEvent event, AxisService service) {
+
+            if (JMSUtils.isJMSService(service)) {
+                switch (event.getEventType()) {
+                    case AxisEvent.SERVICE_DEPLOY :
+                        startListeningForService(service);
+                        break;
+                    case AxisEvent.SERVICE_REMOVE :
+                        stopListeningForService(service);
+                        break;
+                    case AxisEvent.SERVICE_START  :
+                        startListeningForService(service);
+                        break;
+                    case AxisEvent.SERVICE_STOP   :
+                        stopListeningForService(service);
+                        break;
+                }
+            }
+        }
+
+        public void moduleUpdate(AxisEvent event, AxisModule module) {
+        }
+
+        //--------------------------------------------------------
+        public void addParameter(Parameter param) throws AxisFault {
+        }
+
+        public void removeParameter(Parameter param) throws AxisFault {
+        }
+
+        public void deserializeParameters(OMElement parameterElement) throws 
AxisFault {
+        }
+
+        public Parameter getParameter(String name) {
+            return null;
+        }
+
+        public ArrayList getParameters() {
+            return null;
+        }
+
+        public boolean isParameterLocked(String parameterName) {
+            return false;
+        }
+
+        public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup 
serviceGroup) {
+        }
+    }
+
+    public ConfigurationContext getConfigurationContext() {
+        return this.configCtx;
+    }
+
+
+    public SessionContext getSessionContext(MessageContext messageContext) {
+        return null;
+    }
+
+    public void destroy() {
+        this.configCtx = null;
+    }
+}

_______________________________________________
Mashup-dev mailing list
[email protected]
http://mailman.wso2.org/cgi-bin/mailman/listinfo/mashup-dev

Reply via email to