Author: ruwan
Date: Tue Oct  9 03:23:29 2007
New Revision: 583093

URL: http://svn.apache.org/viewvc?rev=583093&view=rev
Log:
Adding a thread pool to synapse and using that in the injectAsync case

Added:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java
Modified:
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java
    
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java?rev=583093&r1=583092&r2=583093&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java
 Tue Oct  9 03:23:29 2007
@@ -23,7 +23,8 @@
 import org.apache.synapse.mediators.base.SequenceMediator;
 import org.apache.synapse.endpoints.utils.EndpointDefinition;
 import org.apache.synapse.statistics.StatisticsCollector;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+
+import java.util.concurrent.ExecutorService;
 
 /**
  * The SynapseEnvironment allows access into the the host SOAP engine. It 
allows
@@ -84,10 +85,10 @@
     public void setStatisticsCollector(StatisticsCollector 
statisticsCollector);
 
     /**
-     * This is used by anyone who needs access to a ThreadPool. It offers the 
ability to
-     * start work. See the backport concurrent documentation
+     * This is used by anyone who needs access to a SynapseThreadPool.
+     * It offers the ability to start work.
+     * 
      * @return Returns the ExecutorService
      */
-    // public ExecutorService getExecutorService();
-    // not used yet
+     public ExecutorService getExecutorService();
 }

Modified: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java?rev=583093&r1=583092&r2=583093&view=diff
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java
 Tue Oct  9 03:23:29 2007
@@ -32,9 +32,13 @@
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.endpoints.utils.EndpointDefinition;
 import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.synapse.mediators.MediatorWorker;
 import org.apache.synapse.statistics.StatisticsCollector;
 import org.apache.synapse.statistics.StatisticsUtils;
 import org.apache.synapse.util.UUIDGenerator;
+import org.apache.synapse.util.concurrent.SynapseThreadPool;
+
+import java.util.concurrent.ExecutorService;
 
 /**
  * This is the Axis2 implementation of the SynapseEnvironment
@@ -45,15 +49,18 @@
 
     private SynapseConfiguration synapseConfig;
     private ConfigurationContext configContext;
+    private ExecutorService executorService;
 
     /** The StatisticsCollector object */
     private StatisticsCollector statisticsCollector;
 
-    public Axis2SynapseEnvironment() {}
+    public Axis2SynapseEnvironment() {
+        this.executorService = new SynapseThreadPool();
+    }
 
     public Axis2SynapseEnvironment(ConfigurationContext cfgCtx,
         SynapseConfiguration synapseConfig) {
-
+        this();
         this.configContext = cfgCtx;
         this.synapseConfig = synapseConfig;
     }
@@ -134,8 +141,7 @@
             StatisticsUtils.processSequenceStatistics(synCtx);
         }
 
-        ((Axis2MessageContext) synCtx).getAxis2MessageContext()
-            .getConfigurationContext().getThreadPool().execute(new 
SynapseWorker(seq, synCtx));
+        executorService.execute(new MediatorWorker(seq, synCtx));
 
     }
 
@@ -218,46 +224,13 @@
     }
 
     /**
-     * This inner class will be used as the executer for the injectAsync 
method for the
-     * sequence mediation
+     * This will give the access to the synapse thread pool for the
+     * advanced mediation tasks.
+     *
+     * @return an ExecutorService to execute the tasks in a new thread from 
the pool
      */
-    private class SynapseWorker implements Runnable {
-
-        /** Mediator to be executed */
-        private Mediator seq = null;
-
-        /** MessageContext to be mediated using the mediator */
-        private MessageContext synCtx = null;
-
-        /**
-         * Constructor of the SynapseWorker which sets the sequence and the 
message context
-         *
-         * @param seq    - Sequence Mediator to be set
-         * @param synCtx - Synapse MessageContext to be set
-         */
-        public SynapseWorker(Mediator seq, MessageContext synCtx) {
-            this.seq = seq;
-            this.synCtx = synCtx;
-        }
-
-        /**
-         * Constructor od the SynapseWorker which sets the provided message 
context and the
-         * main sequence as the sequence for mediation
-         *
-         * @param synCtx - Synapse MessageContext to be set
-         */
-        public SynapseWorker(MessageContext synCtx) {
-            this.synCtx = synCtx;
-            seq = synCtx.getMainSequence();
-        }
-
-        /**
-         * Execution method of the thread. This will just call the mediation 
of the specified
-         * Synapse MessageContext using the specified Sequence Mediator
-         */
-        public void run() {
-            seq.mediate(synCtx);
-        }
+    public ExecutorService getExecutorService() {
+        return executorService;
     }
 
 }

Added: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java?rev=583093&view=auto
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
 Tue Oct  9 03:23:29 2007
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.mediators;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+
+/**
+ * This class will be used as the executer for the injectAsync method for the
+ * sequence mediation
+ */
+public class MediatorWorker implements Runnable {
+
+    /** Mediator to be executed */
+    private Mediator seq = null;
+
+    /** MessageContext to be mediated using the mediator */
+    private MessageContext synCtx = null;
+
+    /**
+     * Constructor of the MediatorWorker which sets the sequence and the 
message context
+     *
+     * @param seq    - Sequence Mediator to be set
+     * @param synCtx - Synapse MessageContext to be set
+     */
+    public MediatorWorker(Mediator seq, MessageContext synCtx) {
+        this.seq = seq;
+        this.synCtx = synCtx;
+    }
+
+    /**
+     * Constructor od the MediatorWorker which sets the provided message 
context and the
+     * main sequence as the sequence for mediation
+     *
+     * @param synCtx - Synapse MessageContext to be set
+     */
+    public MediatorWorker(MessageContext synCtx) {
+        this.synCtx = synCtx;
+        seq = synCtx.getMainSequence();
+    }
+
+    /**
+     * Execution method of the thread. This will just call the mediation of 
the specified
+     * Synapse MessageContext using the specified Sequence Mediator
+     */
+    public void run() {
+        seq.mediate(synCtx);
+    }
+}

Added: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java?rev=583093&view=auto
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java
 Tue Oct  9 03:23:29 2007
@@ -0,0 +1,72 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.util.concurrent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * This is the thread factory for Synapse threads which are accessibal through 
the
+ * SynapseEnvironment as pooled threads
+ */
+public class SynapseThreadFactory implements ThreadFactory {
+
+    /** Holds the ThreadGroup under which this factory creates threads */
+    final ThreadGroup group;
+
+    /** Holds the AtomicInteger class instance for the factory */
+    final AtomicInteger count;
+
+    /** prefix for the thread id, thread number will be followed to construct 
the id */
+    final String namePrefix;
+
+    /**
+     * Constructor for the ThreadFactory to create new threads
+     *
+     * @param group      - all the threads are created under this group by 
this factory
+     * @param namePrefix - name prefix of the threads created by this factory
+     */
+    public SynapseThreadFactory(final ThreadGroup group, final String 
namePrefix) {
+        super();
+        this.count = new AtomicInteger(1);
+        this.group = group;
+        this.namePrefix = namePrefix;
+    }
+
+    /**
+     * This method is the implementation of the the newThread method and will
+     * create new threads under the group and with the nameprefix followed by 
the
+     * thread number as the id
+     * 
+     * @param runnable - Runnable class to run by the created thread
+     * @return a Thread executing the given runnable
+     */
+    public Thread newThread(final Runnable runnable) {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(this.namePrefix);
+        buffer.append('-');
+        buffer.append(this.count.getAndIncrement());
+        Thread t = new Thread(group, runnable, buffer.toString(), 0);
+        t.setDaemon(false);
+        t.setPriority(Thread.NORM_PRIORITY);
+        return t;
+    }
+
+}

Added: 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java?rev=583093&view=auto
==============================================================================
--- 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java
 Tue Oct  9 03:23:29 2007
@@ -0,0 +1,67 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.util.concurrent;
+
+import java.util.concurrent.*;
+
+/**
+ * This is the executor service that will be returned by the env
+ */
+public class SynapseThreadPool extends ThreadPoolExecutor {
+
+    // default values
+    private static final int SYNAPSE_CORE_THREADS  = 20;
+    private static final int SYNAPSE_MAX_THREADS   = 100;
+    private static final int SYNAPSE_KEEP_ALIVE     = 5;
+    private static final int BLOCKING_QUEUE_LENGTH = -1;
+    private static final String SYNAPSE_THREAD_GROUP = "synapse-thread-group";
+    private static final String SYNAPSE_THREAD_ID_PREFIX = "SynapseWorker";
+
+    // property keys
+    private static final String SYN_THREAD_CORE     = "syn_t_core";
+    private static final String SYN_THREAD_MAX      = "syn_t_max";
+    private static final String SYN_THREAD_ALIVE    = "syn_alive_sec";
+    private static final String SYN_THREAD_QLEN     = "syn_qlen";
+
+    /**
+     * Constructor for the Synapse thread poll
+     * 
+     * @param corePoolSize    - number of threads to keep in the pool, even if 
they are idle
+     * @param maximumPoolSize - the maximum number of threads to allow in the 
pool
+     * @param keepAliveTime   - this is the maximum time that excess idle 
threads will wait
+     *  for new tasks before terminating.
+     * @param unit            - the time unit for the keepAliveTime argument.
+     * @param workQueue       - the queue to use for holding tasks before they 
are executed.
+     */
+    public SynapseThreadPool(int corePoolSize, int maximumPoolSize, long 
keepAliveTime,
+        TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+            new SynapseThreadFactory(
+                new ThreadGroup(SYNAPSE_THREAD_GROUP), 
SYNAPSE_THREAD_ID_PREFIX));
+    }
+
+    /**
+     * Default Constructor for the thread pool and will use all the values as 
default
+     */
+    public SynapseThreadPool() {
+        this(SYNAPSE_CORE_THREADS, SYNAPSE_MAX_THREADS, SYNAPSE_KEEP_ALIVE,
+            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to