Author: ruwan
Date: Tue Oct 9 03:23:29 2007
New Revision: 583093
URL: http://svn.apache.org/viewvc?rev=583093&view=rev
Log:
Adding a thread pool to synapse and using that in the injectAsync case
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java?rev=583093&r1=583092&r2=583093&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java
Tue Oct 9 03:23:29 2007
@@ -23,7 +23,8 @@
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.endpoints.utils.EndpointDefinition;
import org.apache.synapse.statistics.StatisticsCollector;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+
+import java.util.concurrent.ExecutorService;
/**
* The SynapseEnvironment allows access into the the host SOAP engine. It
allows
@@ -84,10 +85,10 @@
public void setStatisticsCollector(StatisticsCollector
statisticsCollector);
/**
- * This is used by anyone who needs access to a ThreadPool. It offers the
ability to
- * start work. See the backport concurrent documentation
+ * This is used by anyone who needs access to a SynapseThreadPool.
+ * It offers the ability to start work.
+ *
* @return Returns the ExecutorService
*/
- // public ExecutorService getExecutorService();
- // not used yet
+ public ExecutorService getExecutorService();
}
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java?rev=583093&r1=583092&r2=583093&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java
Tue Oct 9 03:23:29 2007
@@ -32,9 +32,13 @@
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.endpoints.utils.EndpointDefinition;
import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.synapse.mediators.MediatorWorker;
import org.apache.synapse.statistics.StatisticsCollector;
import org.apache.synapse.statistics.StatisticsUtils;
import org.apache.synapse.util.UUIDGenerator;
+import org.apache.synapse.util.concurrent.SynapseThreadPool;
+
+import java.util.concurrent.ExecutorService;
/**
* This is the Axis2 implementation of the SynapseEnvironment
@@ -45,15 +49,18 @@
private SynapseConfiguration synapseConfig;
private ConfigurationContext configContext;
+ private ExecutorService executorService;
/** The StatisticsCollector object */
private StatisticsCollector statisticsCollector;
- public Axis2SynapseEnvironment() {}
+ public Axis2SynapseEnvironment() {
+ this.executorService = new SynapseThreadPool();
+ }
public Axis2SynapseEnvironment(ConfigurationContext cfgCtx,
SynapseConfiguration synapseConfig) {
-
+ this();
this.configContext = cfgCtx;
this.synapseConfig = synapseConfig;
}
@@ -134,8 +141,7 @@
StatisticsUtils.processSequenceStatistics(synCtx);
}
- ((Axis2MessageContext) synCtx).getAxis2MessageContext()
- .getConfigurationContext().getThreadPool().execute(new
SynapseWorker(seq, synCtx));
+ executorService.execute(new MediatorWorker(seq, synCtx));
}
@@ -218,46 +224,13 @@
}
/**
- * This inner class will be used as the executer for the injectAsync
method for the
- * sequence mediation
+ * This will give the access to the synapse thread pool for the
+ * advanced mediation tasks.
+ *
+ * @return an ExecutorService to execute the tasks in a new thread from
the pool
*/
- private class SynapseWorker implements Runnable {
-
- /** Mediator to be executed */
- private Mediator seq = null;
-
- /** MessageContext to be mediated using the mediator */
- private MessageContext synCtx = null;
-
- /**
- * Constructor of the SynapseWorker which sets the sequence and the
message context
- *
- * @param seq - Sequence Mediator to be set
- * @param synCtx - Synapse MessageContext to be set
- */
- public SynapseWorker(Mediator seq, MessageContext synCtx) {
- this.seq = seq;
- this.synCtx = synCtx;
- }
-
- /**
- * Constructor od the SynapseWorker which sets the provided message
context and the
- * main sequence as the sequence for mediation
- *
- * @param synCtx - Synapse MessageContext to be set
- */
- public SynapseWorker(MessageContext synCtx) {
- this.synCtx = synCtx;
- seq = synCtx.getMainSequence();
- }
-
- /**
- * Execution method of the thread. This will just call the mediation
of the specified
- * Synapse MessageContext using the specified Sequence Mediator
- */
- public void run() {
- seq.mediate(synCtx);
- }
+ public ExecutorService getExecutorService() {
+ return executorService;
}
}
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java?rev=583093&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
(added)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
Tue Oct 9 03:23:29 2007
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.mediators;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+
+/**
+ * This class will be used as the executer for the injectAsync method for the
+ * sequence mediation
+ */
+public class MediatorWorker implements Runnable {
+
+ /** Mediator to be executed */
+ private Mediator seq = null;
+
+ /** MessageContext to be mediated using the mediator */
+ private MessageContext synCtx = null;
+
+ /**
+ * Constructor of the MediatorWorker which sets the sequence and the
message context
+ *
+ * @param seq - Sequence Mediator to be set
+ * @param synCtx - Synapse MessageContext to be set
+ */
+ public MediatorWorker(Mediator seq, MessageContext synCtx) {
+ this.seq = seq;
+ this.synCtx = synCtx;
+ }
+
+ /**
+ * Constructor od the MediatorWorker which sets the provided message
context and the
+ * main sequence as the sequence for mediation
+ *
+ * @param synCtx - Synapse MessageContext to be set
+ */
+ public MediatorWorker(MessageContext synCtx) {
+ this.synCtx = synCtx;
+ seq = synCtx.getMainSequence();
+ }
+
+ /**
+ * Execution method of the thread. This will just call the mediation of
the specified
+ * Synapse MessageContext using the specified Sequence Mediator
+ */
+ public void run() {
+ seq.mediate(synCtx);
+ }
+}
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java?rev=583093&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java
(added)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadFactory.java
Tue Oct 9 03:23:29 2007
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.util.concurrent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * This is the thread factory for Synapse threads which are accessibal through
the
+ * SynapseEnvironment as pooled threads
+ */
+public class SynapseThreadFactory implements ThreadFactory {
+
+ /** Holds the ThreadGroup under which this factory creates threads */
+ final ThreadGroup group;
+
+ /** Holds the AtomicInteger class instance for the factory */
+ final AtomicInteger count;
+
+ /** prefix for the thread id, thread number will be followed to construct
the id */
+ final String namePrefix;
+
+ /**
+ * Constructor for the ThreadFactory to create new threads
+ *
+ * @param group - all the threads are created under this group by
this factory
+ * @param namePrefix - name prefix of the threads created by this factory
+ */
+ public SynapseThreadFactory(final ThreadGroup group, final String
namePrefix) {
+ super();
+ this.count = new AtomicInteger(1);
+ this.group = group;
+ this.namePrefix = namePrefix;
+ }
+
+ /**
+ * This method is the implementation of the the newThread method and will
+ * create new threads under the group and with the nameprefix followed by
the
+ * thread number as the id
+ *
+ * @param runnable - Runnable class to run by the created thread
+ * @return a Thread executing the given runnable
+ */
+ public Thread newThread(final Runnable runnable) {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(this.namePrefix);
+ buffer.append('-');
+ buffer.append(this.count.getAndIncrement());
+ Thread t = new Thread(group, runnable, buffer.toString(), 0);
+ t.setDaemon(false);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+
+}
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java?rev=583093&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java
(added)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/concurrent/SynapseThreadPool.java
Tue Oct 9 03:23:29 2007
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.util.concurrent;
+
+import java.util.concurrent.*;
+
+/**
+ * This is the executor service that will be returned by the env
+ */
+public class SynapseThreadPool extends ThreadPoolExecutor {
+
+ // default values
+ private static final int SYNAPSE_CORE_THREADS = 20;
+ private static final int SYNAPSE_MAX_THREADS = 100;
+ private static final int SYNAPSE_KEEP_ALIVE = 5;
+ private static final int BLOCKING_QUEUE_LENGTH = -1;
+ private static final String SYNAPSE_THREAD_GROUP = "synapse-thread-group";
+ private static final String SYNAPSE_THREAD_ID_PREFIX = "SynapseWorker";
+
+ // property keys
+ private static final String SYN_THREAD_CORE = "syn_t_core";
+ private static final String SYN_THREAD_MAX = "syn_t_max";
+ private static final String SYN_THREAD_ALIVE = "syn_alive_sec";
+ private static final String SYN_THREAD_QLEN = "syn_qlen";
+
+ /**
+ * Constructor for the Synapse thread poll
+ *
+ * @param corePoolSize - number of threads to keep in the pool, even if
they are idle
+ * @param maximumPoolSize - the maximum number of threads to allow in the
pool
+ * @param keepAliveTime - this is the maximum time that excess idle
threads will wait
+ * for new tasks before terminating.
+ * @param unit - the time unit for the keepAliveTime argument.
+ * @param workQueue - the queue to use for holding tasks before they
are executed.
+ */
+ public SynapseThreadPool(int corePoolSize, int maximumPoolSize, long
keepAliveTime,
+ TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ new SynapseThreadFactory(
+ new ThreadGroup(SYNAPSE_THREAD_GROUP),
SYNAPSE_THREAD_ID_PREFIX));
+ }
+
+ /**
+ * Default Constructor for the thread pool and will use all the values as
default
+ */
+ public SynapseThreadPool() {
+ this(SYNAPSE_CORE_THREADS, SYNAPSE_MAX_THREADS, SYNAPSE_KEEP_ALIVE,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]