ContextualWorkManager has been changed over to present a ScheduledExecutorService for applications to use, rather than a WorkManager.
Starting up the Admin service now works. AdminService is not yet fully implemented. Project: http://git-wip-us.apache.org/repos/asf/river-container/repo Commit: http://git-wip-us.apache.org/repos/asf/river-container/commit/e6828db5 Tree: http://git-wip-us.apache.org/repos/asf/river-container/tree/e6828db5 Diff: http://git-wip-us.apache.org/repos/asf/river-container/diff/e6828db5 Branch: refs/heads/master Commit: e6828db5b36e56d6eef5c40f1b9cd3e79f2eb93f Parents: 317dac0 Author: Greg Trasuk <gtra...@apache.org> Authored: Mon Jan 27 01:50:17 2014 -0500 Committer: Greg Trasuk <gtra...@apache.org> Committed: Mon Jan 27 01:50:17 2014 -0500 ---------------------------------------------------------------------- .../river/container/admin/impl/AdminImpl.java | 19 +- .../org/apache/river/container/Bootstrap.java | 4 + .../org/apache/river/container/Strings.java | 2 + .../deployer/StarterServiceDeployer.java | 42 ++-- .../river/container/work/BasicExecutor.java | 242 +++++++++++++++++++ .../container/work/ContextualWorkManager.java | 25 +- .../river/container/work/WorkingContext.java | 4 +- .../src/site/markdown/WorkManager.md | 60 +++++ river-container-core/src/site/markdown/index.md | 5 +- .../work/ContextualWorkManagerTest.java | 16 +- 10 files changed, 376 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java ---------------------------------------------------------------------- diff --git a/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java b/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java index ca0de09..b137e94 100644 --- a/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java +++ b/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java @@ -20,9 +20,8 @@ package org.apache.river.container.admin.impl; import com.sun.jini.config.Config; import com.sun.jini.start.LifeCycle; import java.io.IOException; -import java.net.SocketPermission; import java.rmi.server.ExportException; -import java.security.AccessController; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; import net.jini.config.Configuration; @@ -60,16 +59,13 @@ public class AdminImpl implements ServiceIDListener, AdminRemote { JoinManager joinManager = null; DiscoveryManagement discoveryManager = null; Entry[] attributes = null; - + ScheduledExecutorService executor=null; + public AdminImpl(String args[], final LifeCycle lc) throws ConfigurationException, ExportException, IOException { config = ConfigurationProvider.getInstance(args); // Get the exporter and create our proxy. exporter = (Exporter) Config.getNonNullEntry(config, COMPONENT_ID, "exporter", Exporter.class); - log.fine("\n"); - org.apache.river.container.Utils.logClassLoaderHierarchy(log, this.getClass()); - org.apache.river.container.Utils.logClassLoaderHierarchy(log, config.getClass()); - log.fine("\n"); Utils.logGrantsToClass(log, Level.FINE, this.getClass()); try { myProxy = (Admin) exporter.export(this); @@ -83,6 +79,15 @@ public class AdminImpl implements ServiceIDListener, AdminRemote { // We don't have to do anything with it - just creating it starts the join process. joinManager = new JoinManager(myProxy, attributes, this, discoveryManager, null, config); log.info("Started the admin service"); + + /* For local clients, we don't want to be dependent on the Jini infrastructure being setup + correctly. For this reason, we stash a copy of the proxy's MarshalledObject in the local + file system. + */ + synchronized(this) { + executor=(ScheduledExecutorService) Config.getNonNullEntry(config, COMPONENT_ID, "$executor", ScheduledExecutorService.class); + } + } ServiceID sid = null; http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java ---------------------------------------------------------------------- diff --git a/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java b/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java index 0943d4c..a7860a2 100644 --- a/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java +++ b/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java @@ -173,6 +173,10 @@ public class Bootstrap { Method initCompleteMethod = context.getClass().getMethod(Strings.INIT_COMPLETE, new Class[0]); Thread.currentThread().setContextClassLoader(containerClassLoader); putByNameMethod.invoke(context, Strings.CLASS_LOADERS, (Object) classLoaders); + + /* Store a link to the context in the context. */ + putByNameMethod.invoke(context, Strings.CONTEXT, context); + /* Process the core configuration */ http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/Strings.java ---------------------------------------------------------------------- diff --git a/river-container-core/src/main/java/org/apache/river/container/Strings.java b/river-container-core/src/main/java/org/apache/river/container/Strings.java index 4bbc2be..76510b3 100644 --- a/river-container-core/src/main/java/org/apache/river/container/Strings.java +++ b/river-container-core/src/main/java/org/apache/river/container/Strings.java @@ -32,6 +32,7 @@ public class Strings { CORE_CONFIG_XML="core-config.xml", CONTAINER_CLASS_LOADER="containerClassLoader", CONTAINER_JMX_DOMAIN="org.apache.river.container", + CONTEXT="context", CONTEXT_CLASS = "org.apache.river.container.Context", DASH = "-", DEFAULT = "default", @@ -45,6 +46,7 @@ public class Strings { DOT_PROPERTIES=".properties", DOT_SSAR=".ssar", EMPTY = "", + EXECUTOR_NAME="scheduledExecutorService", GET_ADMIN="getAdmin", FILE_UTILITY="fileUtility", INIT_COMPLETE="initComplete", http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java ---------------------------------------------------------------------- diff --git a/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java b/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java index 54578dc..500bf90 100644 --- a/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java +++ b/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java @@ -137,9 +137,9 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean { String parentLoaderName = configNode.search( new Class[]{ASTconfig.class, ASTclassloader.class, ASTparent.class}).get(0).jjtGetChild(0).toString(); log.log(Level.FINE, MessageNames.SERVICE_PARENT_CLASSLOADER_IS, parentLoaderName); - boolean isAppPriority=false; - if (!configNode.search( new Class[]{ ASTconfig.class, ASTclassloader.class, ASTappPriority.class}).isEmpty()) { - isAppPriority=true; + boolean isAppPriority = false; + if (!configNode.search(new Class[]{ASTconfig.class, ASTclassloader.class, ASTappPriority.class}).isEmpty()) { + isAppPriority = true; } ClassLoader parentLoader = (ClassLoader) context.get(parentLoaderName); VirtualFileSystemClassLoader cl @@ -248,7 +248,7 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean { } else { throw new UnsupportedOperationException(); } - env.getWorkingContext().getWorkManager().queueTask(env.getClassLoader(), task); + env.getWorkingContext().getScheduledExecutorService().submit(task); } public Properties readStartProperties(FileObject serviceRoot) throws FileSystemException, LocalizedRuntimeException, IOException { @@ -267,26 +267,29 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean { return startProps; } - public void setupLiaisonConfiguration(FileObject serviceArchive, FileObject serviceRoot, VirtualFileSystemClassLoader cl) throws ConfigurationException { + public void setupLiaisonConfiguration(ApplicationEnvironment env) throws ConfigurationException { /* Setup the liaison configuration. */ ClassLoader originalContextCl = Thread.currentThread().getContextClassLoader(); try { - Thread.currentThread().setContextClassLoader(cl); + Thread.currentThread().setContextClassLoader(env.getClassLoader()); File workingDir = null; - if (serviceArchive != null) { - workingDir = new File(serviceArchive.getURL().toURI()); + if (env.getServiceArchive() != null) { + /* TODO: Is this right? Shouldn't the working directory be created + by the file manager under the 'work' dir? + */ + workingDir = new File(env.getServiceArchive().getURL().toURI()); } else { - workingDir = new File(serviceRoot.getURL().toURI()); + workingDir = new File(env.getServiceArchive().getURL().toURI()); } - grantPermissions(cl, + grantPermissions(env.getClassLoader(), new Permission[]{new FilePermission(workingDir.getAbsolutePath(), Strings.READ)}); Utils.logClassLoaderHierarchy(log, Level.FINE, this.getClass()); String configName = VirtualFileSystemConfiguration.class.getName(); - invokeStatic(cl, configName, + invokeStatic(env.getClassLoader(), configName, Strings.SET_WORKING_DIRECTORY, workingDir); /* @@ -299,7 +302,7 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean { String contextVarName = cfgEntryNode.jjtGetChild(1).toString(); Object contextValue = context.get(contextVarName); if (contextValue != null) { - invokeStatic(cl, configName, + invokeStatic(env.getClassLoader(), configName, Strings.PUT_SPECIAL_ENTRY, new Class[]{String.class, Object.class}, Strings.DOLLAR + varName, contextValue); @@ -308,6 +311,14 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean { new Object[]{getConfig(), varName, contextVarName}); } } + /* Install the Executor. */ + invokeStatic(env.getClassLoader(), configName, + Strings.PUT_SPECIAL_ENTRY, + new Class[]{String.class, Object.class + }, + Strings.DOLLAR + Strings.EXECUTOR_NAME, env.getWorkingContext().getScheduledExecutorService() + ); + } catch (Exception ex) { log.log(Level.WARNING, MessageNames.EXCEPTION_THROWN, Utils.stackTrace(ex)); throw new ConfigurationException(ex, @@ -385,12 +396,13 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean { */ Permission[] perms = createPermissionsInClassloader(cl); grantPermissions(cl, perms); - setupLiaisonConfiguration(env.getServiceArchive(), env.getServiceRoot(), cl); - + /* * Create a working context (work manager). */ - env.setWorkingContext(contextualWorkManager.createContext(env.getServiceName())); + env.setWorkingContext(contextualWorkManager.createContext(env.getServiceName(), env.getClassLoader())); + + setupLiaisonConfiguration(env); } void launchService(ApplicationEnvironment env, String[] serviceArgs) throws FileSystemException, IOException, ClassNotFoundException { http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/work/BasicExecutor.java ---------------------------------------------------------------------- diff --git a/river-container-core/src/main/java/org/apache/river/container/work/BasicExecutor.java b/river-container-core/src/main/java/org/apache/river/container/work/BasicExecutor.java new file mode 100644 index 0000000..c3a996e --- /dev/null +++ b/river-container-core/src/main/java/org/apache/river/container/work/BasicExecutor.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + + */ +package org.apache.river.container.work; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.river.container.Init; +import org.apache.river.container.MessageNames; +import org.apache.river.container.Shutdown; +import org.apache.river.container.Strings; + +/** + * + * A Basic implementation of WorkManager that runs the work threads through a + * ThreadPoolExecutor. + * + * @author trasukg + */ +public class BasicExecutor implements ScheduledExecutorService { + + private static final Logger log = Logger.getLogger(BasicExecutor.class.getName(), MessageNames.BUNDLE_NAME); + ExecutorService executor = null; + ScheduledExecutorService scheduledExecutor=null; + private MyThreadFactory threadFactory = null; + private String name = Strings.UNNAMED; + private ClassLoader contextLoader; + + public BasicExecutor(ClassLoader contextLoader) { + this(contextLoader, Strings.UNNAMED); + } + + public BasicExecutor(ClassLoader contextLoader, String name) { + this.contextLoader=contextLoader; + this.name = name; + threadFactory = new MyThreadFactory(); + executor = Executors.newCachedThreadPool(threadFactory); + scheduledExecutor= + Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + synchronized int getActiveCount() { + return threadFactory.threadGroup.activeCount(); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return scheduledExecutor.schedule(new TaskRunnable(command, classLoaderToUse()), delay, unit); + } + + private class TaskRunnable implements Runnable { + + Runnable task = null; + ClassLoader contextClassLoader = null; + ClassLoader originalClassLoader = null; + + TaskRunnable(Runnable task, ClassLoader contextClassLoader) { + this.task = task; + this.contextClassLoader = contextClassLoader; + } + + @Override + public void run() { + originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(contextClassLoader); + try { + task.run(); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + } + + private class TaskCallable<T> implements Callable<T> { + + Callable<T> task = null; + ClassLoader contextClassLoader = null; + ClassLoader originalClassLoader = null; + + TaskCallable(Callable<T> task, ClassLoader contextClassLoader) { + this.task = task; + this.contextClassLoader = contextClassLoader; + } + + @Override + public T call() throws Exception { + originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(contextClassLoader); + try { + return task.call(); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + } + + @Init + public void init() { + log.info(MessageNames.BASIC_WORK_MANAGER_INITIALIZED); + } + + @Shutdown + public void shutdown() { + executor.shutdownNow(); + scheduledExecutor.shutdownNow(); + } + + private class MyThreadFactory implements ThreadFactory { + + private ThreadGroup threadGroup = new ThreadGroup(name); + private int index = 0; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(threadGroup, r); + t.setName(name + Strings.DASH + index++); + log.log(Level.FINE, MessageNames.CREATED_THREAD, + new Object[]{t.getName(), t.getThreadGroup().getName()}); + return t; + } + } + + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return scheduledExecutor.schedule(new TaskCallable(callable, classLoaderToUse()), delay, unit); + } + + private ClassLoader classLoaderToUse() { + ClassLoader classLoaderToUse = + contextLoader != null ? contextLoader : Thread.currentThread().getContextClassLoader(); + return classLoaderToUse; + } + + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return scheduledExecutor.scheduleAtFixedRate(new TaskRunnable(command, classLoaderToUse()), initialDelay, period, unit); + } + + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduledExecutor.scheduleWithFixedDelay(new TaskRunnable(command, classLoaderToUse()), initialDelay, delay, unit); + } + + public List<Runnable> shutdownNow() { + List<Runnable> neverCommenced=new ArrayList<Runnable>(); + neverCommenced.addAll(scheduledExecutor.shutdownNow()); + neverCommenced.addAll(executor.shutdownNow()); + return neverCommenced; + } + + public boolean isShutdown() { + return scheduledExecutor.isShutdown() & executor.isShutdown(); + } + + public boolean isTerminated() { + return scheduledExecutor.isTerminated() & executor.isTerminated() & getActiveCount()==0; + } + + /** + * Await termination. Note that this implementation doesn't make any guarantees + * about accuracy of the termination wait time, but it will be bounded at 2*timeout. + * @param timeout + * @param unit + * @return + * @throws InterruptedException + */ + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit) & scheduledExecutor.awaitTermination(timeout, unit); + } + + public <T> Future<T> submit(Callable<T> task) { + return executor.submit(new TaskCallable(task, classLoaderToUse())); + } + + public <T> Future<T> submit(Runnable task, T result) { + return executor.submit(new TaskRunnable(task, classLoaderToUse()), result); + } + + public Future<?> submit(Runnable task) { + return executor.submit(new TaskRunnable(task, classLoaderToUse())); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + List<Callable<T>> wrappedTasks = constructListOfWrappedTasks(tasks); + return executor.invokeAll(wrappedTasks); + } + + private <T> List<Callable<T>> constructListOfWrappedTasks(Collection<? extends Callable<T>> tasks) { + /* Construct a list of wrapped tasks. */ + List<Callable<T>> wrappedTasks=new ArrayList<Callable<T>>(tasks.size()); + for (Callable<T> task: tasks) { + wrappedTasks.add(new TaskCallable(task, classLoaderToUse())); + } + return wrappedTasks; + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + List<Callable<T>> wrappedTasks = constructListOfWrappedTasks(tasks); + return executor.invokeAll(wrappedTasks, timeout, unit); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + List<Callable<T>> wrappedTasks = constructListOfWrappedTasks(tasks); + return executor.invokeAny(wrappedTasks); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + List<Callable<T>> wrappedTasks = constructListOfWrappedTasks(tasks); + return executor.invokeAny(wrappedTasks, timeout, unit); + } + + public void execute(Runnable command) { + executor.execute(new TaskRunnable(command, classLoaderToUse())); + } + + +} http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java ---------------------------------------------------------------------- diff --git a/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java b/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java index 8b6e742..e23307b 100644 --- a/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java +++ b/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java @@ -19,6 +19,7 @@ package org.apache.river.container.work; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import org.apache.river.container.Strings; /** @@ -29,44 +30,46 @@ public class ContextualWorkManager { List<Context> contexts=new ArrayList<Context>(); - public WorkingContext createContext(String name) { - Context context=new Context(name); + public WorkingContext createContext(String name, ClassLoader contextLoader) { + Context context=new Context(name, contextLoader); contexts.add(context); return context; } private class Context implements WorkingContext { String name=Strings.UNNAMED; - + ClassLoader contextLoader; + public String getName() { return name; } - public Context(String name) { + public Context(String name, ClassLoader contextLoader) { this.name=name; - workManager=new BasicWorkManager(name); + this.contextLoader=contextLoader; + executor=new BasicExecutor(contextLoader, name); } - BasicWorkManager workManager=null; + BasicExecutor executor=null; @Override - public WorkManager getWorkManager() { - return workManager; + public ScheduledExecutorService getScheduledExecutorService() { + return executor; } @Override public int getActiveThreadCount() { - return workManager.getActiveCount(); + return executor.getActiveCount(); } @Override public void shutdown() { - workManager.shutdown(); + executor.shutdownNow(); } @Override public void interrupt() { - workManager.interrupt(); + executor.shutdownNow(); } } http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java ---------------------------------------------------------------------- diff --git a/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java b/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java index 56b2a0a..1f8c6e4 100644 --- a/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java +++ b/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java @@ -17,6 +17,8 @@ */ package org.apache.river.container.work; +import java.util.concurrent.ScheduledExecutorService; + /** @author trasukg @@ -27,7 +29,7 @@ public interface WorkingContext { context. @return The WorkManager instance. */ - WorkManager getWorkManager(); + ScheduledExecutorService getScheduledExecutorService(); /** Answer how many threads are currently active in this context. http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/site/markdown/WorkManager.md ---------------------------------------------------------------------- diff --git a/river-container-core/src/site/markdown/WorkManager.md b/river-container-core/src/site/markdown/WorkManager.md new file mode 100644 index 0000000..5ea2bf5 --- /dev/null +++ b/river-container-core/src/site/markdown/WorkManager.md @@ -0,0 +1,60 @@ +Work Manager and Work Management in the Container +================================================= + +- In general, containers should be able to control the thread usage and scheduling of +work inside the container. Otherwise it's possible for an application to hijack +execution or prevent a different application, or possibly the container itself, +from executing properly. +- So, this desire means that applications should be discouraged or disallowed from +creating their own threads. In turn, that restriction means that the container must +offer some way to schedule work that should happen on another thread, and possibly at +some time in the future, or even repeatedly. + +How it is on 20140125 +--------------------- + +- The container includes an attempt at this, embodied in `org.apache.river.container.work` + - There is an interface 'WorkManager' that contains a `queue(...)` method that + drops the task into a task queue. The queue method also allows the user to + specify a classloader for the task to run in. The thread pool is expected to set + this classloader as the context classloader before executing the task. + - There is a 'BasicWorkManager' implementation that uses a ThreadPoolExecutor to + implement 'WorkManager' + - There is a 'ContextualWorkManager' implementation that allows jobs to be grouped + together and cancelled en-masse, for instance when an application needs to be + shut down. + +Problems +-------- + +- We need to provide a way for well-written applications to schedule multi-threaded +work. We probably shouldn't introduce container-specific API, especially since there +is a perfectly good API for work management in `javax.concurrent`. +- As written now, the API doesn't prevent a single thread pool, but that isn't implemented +yet. The working contexts each have their own thread pool. +- There is no facility for an application to have any internal prioritization. + +Design Goals +------------ + +- Provide an API to applications that allows them to fire off background work, +scheduled executions, and repetitive tasks. + - Essentially, one or more ScheduledExecutorService objects should be provided + for the application. +- Ideally, there should be one thread pool that is managed by the container +- The executor objects provided to the applications should be isolated from each +other, cancellable en-masse (for application shutdown) and should preserve the +context classloader. +- The number of threads in the thread pool should be configurable. +- Ideally, the thread pool policy should be configurable (i.e. fixed threads, +max threads, min threads, etc). +- Current users of WorkManager interface should be migrated to the new API. +- Number of threads in use, etc should be visible through a management interface. +- Applications should be able to provide prioritization on the tasks + - Perhaps by also implementing Comparable in the task that implements Runnable. + - Runnables that come "first" are run first. + + + + + http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/river-container-core/src/site/markdown/index.md b/river-container-core/src/site/markdown/index.md index ed271e1..d86c88a 100644 --- a/river-container-core/src/site/markdown/index.md +++ b/river-container-core/src/site/markdown/index.md @@ -58,6 +58,9 @@ classes. - [State Machine](StateMachine.html) Information on the annotation-based state machine implementation used in various places in the container. - [Surrogate Deployment](SurrogateDeployment.html) Deployer used to host -Jini Surrogates. +Jini Surrogates. +- [Work Manager Considerations](WorkManager.html) Design thoughts on work/thread +management in the +container. - [To Do](Todo.html) To-do list for the development. http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java ---------------------------------------------------------------------- diff --git a/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java b/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java index 3c751d8..5bdab85 100644 --- a/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java +++ b/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java @@ -32,30 +32,30 @@ import org.junit.Test; public class ContextualWorkManagerTest { ContextualWorkManager UUT = new ContextualWorkManager(); - WorkingContext context = UUT.createContext("Test-ctx"); + WorkingContext context = UUT.createContext("Test-ctx", Thread.currentThread().getContextClassLoader()); @Test public void testContextCreation() { assertNotNull("context", context); - assertNotNull("context.workManager", context.getWorkManager()); + assertNotNull("context.scheduledExecutorService", context.getScheduledExecutorService()); } @Test - public void testThreadCount() { + public void testRunAndExit() { WorkerRunnable wt = new WorkerRunnable(); - context.getWorkManager().queueTask(null, wt); + context.getScheduledExecutorService().submit(wt); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) { Thread.yield(); } assertEquals("thread count", 1, context.getActiveThreadCount()); - wt.proceed = true; + } @Test public void testChildThreadGroup() throws Exception { WorkerRunnable wt = new WorkerRunnable(); - context.getWorkManager().queueTask(null, wt); + context.getScheduledExecutorService().submit(wt); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) { Thread.yield(); @@ -71,7 +71,7 @@ public class ContextualWorkManagerTest { @Test public void testThreadCountWithChildren() throws Exception { WorkerRunnable wt = new WorkerRunnable(2); - context.getWorkManager().queueTask(null, wt); + context.getScheduledExecutorService().submit(wt); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) { Thread.yield(); @@ -91,7 +91,7 @@ public class ContextualWorkManagerTest { String threadGroupName = Strings.UNKNOWN; List<WorkerRunnable> children = new ArrayList<WorkerRunnable>(); String id = "--"; - boolean proceed = false; + volatile boolean proceed = false; int nChildren = 0; public WorkerRunnable() {