http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
deleted file mode 100644
index f549d0f..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor.workermanager;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.Config;
-import org.apache.storm.ProcessSimulator;
-import org.apache.storm.container.cgroup.CgroupManager;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-public class DefaultWorkerManager implements IWorkerManager {
-
-    private static Logger LOG = 
LoggerFactory.getLogger(DefaultWorkerManager.class);
-
-    private Map conf;
-    private CgroupManager resourceIsolationManager;
-    private boolean runWorkerAsUser;
-
-    @Override
-    public void prepareWorker(Map conf, Localizer localizer) {
-        this.conf = conf;
-        if 
(Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), 
false)) {
-            try {
-                this.resourceIsolationManager = Utils.newInstance((String) 
conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
-                this.resourceIsolationManager.prepare(conf);
-                LOG.info("Using resource isolation plugin {} {}", 
conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
-            } catch (IOException e) {
-                throw Utils.wrapInRuntime(e);
-            }
-        } else {
-            this.resourceIsolationManager = null;
-        }
-        this.runWorkerAsUser = 
Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
-    }
-
-    @Override
-    public void launchWorker(String supervisorId, String assignmentId, String 
stormId, Long port, String workerId, WorkerResources resources,
-            Utils.ExitCodeCallable workerExitCallback) {
-        try {
-
-            String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
-            String stormOptions = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
-            String stormConfFile = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
-            String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId);
-
-            String stormLogDir = ConfigUtils.getLogDir();
-            String stormLogConfDir = (String) 
(conf.get(Config.STORM_LOG4J2_CONF_DIR));
-
-            String stormLog4j2ConfDir;
-            if (StringUtils.isNotBlank(stormLogConfDir)) {
-                if (Utils.isAbsolutePath(stormLogConfDir)) {
-                    stormLog4j2ConfDir = stormLogConfDir;
-                } else {
-                    stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR 
+ stormLogConfDir;
-                }
-            } else {
-                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + 
"log4j2";
-            }
-
-            String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, 
stormId);
-
-            String jlp = jlp(stormRoot, conf);
-
-            String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
-
-            StormTopology stormTopology = 
ConfigUtils.readSupervisorTopology(conf, stormId);
-
-            List<String> dependencyLocations = new ArrayList<>();
-            if (stormTopology.get_dependency_jars() != null) {
-                for (String dependency : stormTopology.get_dependency_jars()) {
-                    dependencyLocations.add(new File(stormRoot, 
dependency).getAbsolutePath());
-                }
-            }
-
-            if (stormTopology.get_dependency_artifacts() != null) {
-                for (String dependency : 
stormTopology.get_dependency_artifacts()) {
-                    dependencyLocations.add(new File(stormRoot, 
dependency).getAbsolutePath());
-                }
-            }
-
-            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-
-            String workerClassPath = getWorkerClassPath(stormJar, stormConf, 
dependencyLocations);
-
-            Object topGcOptsObject = 
stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
-            List<String> topGcOpts = new ArrayList<>();
-            if (topGcOptsObject instanceof String) {
-                topGcOpts.add((String) topGcOptsObject);
-            } else if (topGcOptsObject instanceof List) {
-                topGcOpts.addAll((List<String>) topGcOptsObject);
-            }
-
-            int memOnheap = 0;
-            if (resources.get_mem_on_heap() > 0) {
-                memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
-            } else {
-                // set the default heap memory size for supervisor-test
-                memOnheap = 
Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
-            }
-
-            int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
-
-            int cpu = (int) Math.ceil(resources.get_cpu());
-
-            List<String> gcOpts = null;
-
-            if (topGcOpts.size() > 0) {
-                gcOpts = substituteChildopts(topGcOpts, workerId, stormId, 
port, memOnheap);
-            } else {
-                gcOpts = 
substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, 
port, memOnheap);
-            }
-
-            Object topoWorkerLogwriterObject = 
stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
-            List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
-            if (topoWorkerLogwriterObject instanceof String) {
-                topoWorkerLogwriterChildopts.add((String) 
topoWorkerLogwriterObject);
-            } else if (topoWorkerLogwriterObject instanceof List) {
-                topoWorkerLogwriterChildopts.addAll((List<String>) 
topoWorkerLogwriterObject);
-            }
-
-            String user = (String) 
stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-
-            String logfileName = "worker.log";
-
-            String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
-
-            String loggingSensitivity = (String) 
stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
-            if (loggingSensitivity == null) {
-                loggingSensitivity = "S3";
-            }
-
-            List<String> workerChildopts = 
substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, 
memOnheap);
-
-            List<String> topWorkerChildopts = 
substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, 
stormId, port, memOnheap);
-
-            List<String> workerProfilerChildopts = null;
-            if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), 
false)) {
-                workerProfilerChildopts = 
substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, 
stormId, port, memOnheap);
-            } else {
-                workerProfilerChildopts = new ArrayList<>();
-            }
-
-            Map<String, String> topEnvironment = new HashMap<String, String>();
-            Map<String, String> environment = (Map<String, String>) 
stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
-            if (environment != null) {
-                topEnvironment.putAll(environment);
-            }
-            topEnvironment.put("LD_LIBRARY_PATH", jlp);
-
-            String log4jConfigurationFile = null;
-            if (System.getProperty("os.name").startsWith("Windows") && 
!stormLog4j2ConfDir.startsWith("file:")) {
-                log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
-            } else {
-                log4jConfigurationFile = stormLog4j2ConfDir;
-            }
-            log4jConfigurationFile = log4jConfigurationFile + 
Utils.FILE_PATH_SEPARATOR + "worker.xml";
-
-            List<String> commandList = new ArrayList<>();
-            commandList.add(SupervisorUtils.javaCmd("java"));
-            commandList.add("-cp");
-            commandList.add(workerClassPath);
-            commandList.addAll(topoWorkerLogwriterChildopts);
-            commandList.add("-Dlogfile.name=" + logfileName);
-            commandList.add("-Dstorm.home=" + stormHome);
-            commandList.add("-Dworkers.artifacts=" + workersArtifacets);
-            commandList.add("-Dstorm.id=" + stormId);
-            commandList.add("-Dworker.id=" + workerId);
-            commandList.add("-Dworker.port=" + port);
-            commandList.add("-Dstorm.log.dir=" + stormLogDir);
-            commandList.add("-Dlog4j.configurationFile=" + 
log4jConfigurationFile);
-            
commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
-            commandList.add("org.apache.storm.LogWriter");
-
-            commandList.add(SupervisorUtils.javaCmd("java"));
-            commandList.add("-server");
-            commandList.addAll(workerChildopts);
-            commandList.addAll(topWorkerChildopts);
-            commandList.addAll(gcOpts);
-            commandList.addAll(workerProfilerChildopts);
-            commandList.add("-Djava.library.path=" + jlp);
-            commandList.add("-Dlogfile.name=" + logfileName);
-            commandList.add("-Dstorm.home=" + stormHome);
-            commandList.add("-Dworkers.artifacts=" + workersArtifacets);
-            commandList.add("-Dstorm.conf.file=" + stormConfFile);
-            commandList.add("-Dstorm.options=" + stormOptions);
-            commandList.add("-Dstorm.log.dir=" + stormLogDir);
-            commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
-            commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
-            commandList.add("-Dlog4j.configurationFile=" + 
log4jConfigurationFile);
-            
commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
-            commandList.add("-Dstorm.id=" + stormId);
-            commandList.add("-Dworker.id=" + workerId);
-            commandList.add("-Dworker.port=" + port);
-            commandList.add("-cp");
-            commandList.add(workerClassPath);
-            commandList.add("org.apache.storm.daemon.worker");
-            commandList.add(stormId);
-            commandList.add(assignmentId);
-            commandList.add(String.valueOf(port));
-            commandList.add(workerId);
-
-            // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil 
(conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
-            if (resourceIsolationManager != null) {
-                int cGroupMem = (int) (Math.ceil((double) 
conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
-                int memoryValue = memoffheap + memOnheap + cGroupMem;
-                int cpuValue = cpu;
-                Map<String, Number> map = new HashMap<>();
-                map.put("cpu", cpuValue);
-                map.put("memory", memoryValue);
-                resourceIsolationManager.reserveResourcesForWorker(workerId, 
map);
-                commandList = 
resourceIsolationManager.getLaunchCommand(workerId, commandList);
-            }
-
-            LOG.info("Launching worker with command: {}. ", 
Utils.shellCmd(commandList));
-
-            String logPrefix = "Worker Process " + workerId;
-            String workerDir = ConfigUtils.workerRoot(conf, workerId);
-
-            if (runWorkerAsUser) {
-                List<String> args = new ArrayList<>();
-                args.add("worker");
-                args.add(workerDir);
-                args.add(Utils.writeScript(workerDir, commandList, 
topEnvironment));
-                List<String> commandPrefix = null;
-                if (resourceIsolationManager != null)
-                    commandPrefix = 
resourceIsolationManager.getLaunchCommandPrefix(workerId);
-                SupervisorUtils.processLauncher(conf, user, commandPrefix, 
args, null, logPrefix, workerExitCallback, new File(workerDir));
-            } else {
-                Utils.launchProcess(commandList, topEnvironment, logPrefix, 
workerExitCallback, new File(workerDir));
-            }
-        } catch (IOException e) {
-            throw Utils.wrapInRuntime(e);
-        }
-    }
-
-    @Override
-    public void shutdownWorker(String supervisorId, String workerId, 
Map<String, String> workerThreadPids) {
-        try {
-            LOG.info("Shutting down {}:{}", supervisorId, workerId);
-            Collection<String> pids = 
Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId));
-            Integer shutdownSleepSecs = 
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
-            String user = ConfigUtils.getWorkerUser(conf, workerId);
-            String threadPid = workerThreadPids.get(workerId);
-            if (StringUtils.isNotBlank(threadPid)) {
-                ProcessSimulator.killProcess(threadPid);
-            }
-
-            for (String pid : pids) {
-                if (runWorkerAsUser) {
-                    List<String> commands = new ArrayList<>();
-                    commands.add("signal");
-                    commands.add(pid);
-                    commands.add("15");
-                    String logPrefix = "kill -15 " + pid;
-                    SupervisorUtils.processLauncherAndWait(conf, user, 
commands, null, logPrefix);
-                } else {
-                    Utils.killProcessWithSigTerm(pid);
-                }
-            }
-
-            if (pids.size() > 0) {
-                LOG.info("Sleep {} seconds for execution of cleanup threads on 
worker.", shutdownSleepSecs);
-                Time.sleepSecs(shutdownSleepSecs);
-            }
-
-            for (String pid : pids) {
-                if (runWorkerAsUser) {
-                    List<String> commands = new ArrayList<>();
-                    commands.add("signal");
-                    commands.add(pid);
-                    commands.add("9");
-                    String logPrefix = "kill -9 " + pid;
-                    SupervisorUtils.processLauncherAndWait(conf, user, 
commands, null, logPrefix);
-                } else {
-                    Utils.forceKillProcess(pid);
-                }
-                String path = ConfigUtils.workerPidPath(conf, workerId, pid);
-                if (runWorkerAsUser) {
-                    SupervisorUtils.rmrAsUser(conf, workerId, path);
-                } else {
-                    try {
-                        LOG.debug("Removing path {}", path);
-                        new File(path).delete();
-                    } catch (Exception e) {
-                        // on windows, the supervisor may still holds the lock 
on the worker directory
-                        // ignore
-                    }
-                }
-            }
-            LOG.info("Shut down {}:{}", supervisorId, workerId);
-        } catch (Exception e) {
-            throw Utils.wrapInRuntime(e);
-        }
-    }
-
-    @Override
-    public boolean cleanupWorker(String workerId) {
-        try {
-            //clean up for resource isolation if enabled
-            if (resourceIsolationManager != null) {
-                resourceIsolationManager.releaseResourcesForWorker(workerId);
-            }
-            //Always make sure to clean up everything else before worker 
directory
-            //is removed since that is what is going to trigger the retry for 
cleanup
-            String workerRoot = ConfigUtils.workerRoot(conf, workerId);
-            if (Utils.checkFileExists(workerRoot)) {
-                if (runWorkerAsUser) {
-                    SupervisorUtils.rmrAsUser(conf, workerId, workerRoot);
-                } else {
-                    Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, 
workerId));
-                    Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, 
workerId));
-                    Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, 
workerId));
-                    Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId));
-                }
-                ConfigUtils.removeWorkerUserWSE(conf, workerId);
-            }
-            return true;
-        } catch (IOException e) {
-            LOG.warn("Failed to cleanup worker {}. Will retry later", 
workerId, e);
-        } catch (RuntimeException e) {
-            LOG.warn("Failed to cleanup worker {}. Will retry later", 
workerId, e);
-        }
-        return false;
-    }
-
-    protected String jlp(String stormRoot, Map conf) {
-        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + 
ConfigUtils.RESOURCES_SUBDIR;
-        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
-        String arch = System.getProperty("os.arch");
-        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + 
os + "-" + arch;
-        String ret = archResourceRoot + Utils.CLASS_PATH_SEPARATOR + 
resourceRoot + Utils.CLASS_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
-        return ret;
-    }
-
-    protected String getWorkerClassPath(String stormJar, Map stormConf, 
List<String> dependencyLocations) {
-        List<String> topoClasspath = new ArrayList<>();
-        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
-
-        // Will be populated only if 
STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED is set on Nimbus.
-        // Allowed for extreme debugging.
-        Object topologyClasspathFirst = 
stormConf.get(Config.TOPOLOGY_CLASSPATH_BEGINNING);
-        List<String> firstClasspathList = new ArrayList<>();
-        if(topologyClasspathFirst instanceof List) {
-            firstClasspathList.addAll((List<String>)topologyClasspathFirst);
-        } else if (topologyClasspathFirst instanceof String) {
-            firstClasspathList.add((String) topologyClasspathFirst);
-        }
-        LOG.debug("Topology Classpath Prefix: {}", firstClasspathList);
-
-        if (object instanceof List) {
-            topoClasspath.addAll((List<String>) object);
-        } else if (object instanceof String) {
-            topoClasspath.add((String) object);
-        }
-        LOG.debug("Topology specific classpath is {}", object);
-
-        String classPath = Utils.addToClasspath(firstClasspathList, 
Arrays.asList(Utils.workerClasspath()));
-        String classAddPath = Utils.addToClasspath(classPath, 
Arrays.asList(stormJar));
-        String classDepAddedPath = Utils.addToClasspath(classAddPath, 
dependencyLocations);
-        return Utils.addToClasspath(classDepAddedPath, topoClasspath);
-    }
-
-    private static String substituteChildOptsInternal(String string,  String 
workerId, String stormId, Long port, int memOnheap) {
-        if (StringUtils.isNotBlank(string)){
-            string = string.replace("%ID%", String.valueOf(port));
-            string = string.replace("%WORKER-ID%", workerId);
-            string = string.replace("%TOPOLOGY-ID%", stormId);
-            string = string.replace("%WORKER-PORT%", String.valueOf(port));
-            string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
-        }
-        return string;
-    }
-
-    /**
-     * "Generates runtime childopts by replacing keys with topology-id, 
worker-id, port, mem-onheap"
-     *
-     * @param value
-     * @param workerId
-     * @param stormId
-     * @param port
-     * @param memOnheap
-     */
-    public List<String> substituteChildopts(Object value, String workerId, 
String stormId, Long port, int memOnheap) {
-        List<String> rets = new ArrayList<>();
-        if (value instanceof String) {
-            String string = substituteChildOptsInternal((String) value,  
workerId, stormId, port, memOnheap);
-            if (StringUtils.isNotBlank(string)){
-                String[] strings = string.split("\\s+");
-                rets.addAll(Arrays.asList(strings));
-            }
-        } else if (value instanceof List) {
-            List<Object> objects = (List<Object>) value;
-            for (Object object : objects) {
-                String str = substituteChildOptsInternal((String) object,  
workerId, stormId, port, memOnheap);
-                if (StringUtils.isNotBlank(str)){
-                    rets.add(str);
-                }
-            }
-        }
-        return rets;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
deleted file mode 100644
index e62b9d8..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor.workermanager;
-
-import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.Utils;
-
-import java.util.List;
-import java.util.Map;
-
-public interface IWorkerManager {
-    void prepareWorker(Map conf, Localizer localizer);
-
-    void launchWorker(String supervisorId, String assignmentId, String 
stormId, Long port, String workerId, WorkerResources resources,
-                               Utils.ExitCodeCallable workerExitCallback);
-    void shutdownWorker(String supervisorId, String workerId, Map<String, 
String> workerThreadPids);
-
-    boolean cleanupWorker(String workerId);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
new file mode 100644
index 0000000..7887281
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+    /**
+     * A future that has already completed.
+     */
+    private static class AllDoneFuture implements Future<Void> {
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return true;
+        }
+
+        @Override
+        public Void get() {
+            return null;
+        }
+
+        @Override
+        public Void get(long timeout, TimeUnit unit) {
+            return null;
+        }
+
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizer.class);
+
+    private final Localizer _localizer;
+    private final ExecutorService _execService;
+    private final boolean _isLocalMode;
+    private final Map<String, Object> _conf;
+    private final Map<String, LocalDownloadedResource> _basicPending;
+    private final Map<String, LocalDownloadedResource> _blobPending;
+    private final AdvancedFSOps _fsOps;
+
+    private class DownloadBaseBlobsDistributed implements Callable<Void> {
+        protected final String _topologyId;
+        protected final File _stormRoot;
+        
+        public DownloadBaseBlobsDistributed(String topologyId) throws 
IOException {
+            _topologyId = topologyId;
+            _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId));
+        }
+        
+        protected void downloadBaseBlobs(File tmproot) throws Exception {
+            String stormJarKey = ConfigUtils.masterStormJarKey(_topologyId);
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
+            String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+            String jarPath = 
ConfigUtils.supervisorStormJarPath(tmproot.getAbsolutePath());
+            String codePath = 
ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath());
+            String confPath = 
ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath());
+            _fsOps.forceMkdir(tmproot);
+            _fsOps.restrictDirectoryPermissions(tmproot);
+            ClientBlobStore blobStore = 
Utils.getClientBlobStoreForSupervisor(_conf);
+            try {
+                Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, 
blobStore);
+                Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, 
blobStore);
+                Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, 
blobStore);
+            } finally {
+                blobStore.shutdown();
+            }
+            Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, 
tmproot);
+        }
+        
+        @Override
+        public Void call() throws Exception {
+            try {
+                if (_fsOps.fileExists(_stormRoot)) {
+                    if (!_fsOps.supportsAtomicDirectoryMove()) {
+                        LOG.warn("{} may have partially downloaded blobs, 
recovering", _topologyId);
+                        _fsOps.deleteIfExists(_stormRoot);
+                    } else {
+                        LOG.warn("{} already downloaded blobs, skipping", 
_topologyId);
+                        return null;
+                    }
+                }
+                boolean deleteAll = true;
+                String tmproot = ConfigUtils.supervisorTmpDir(_conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+                File tr = new File(tmproot);
+                try {
+                    downloadBaseBlobs(tr);
+                    _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
+                    
_fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, 
_topologyId), _stormRoot);
+                    deleteAll = false;
+                } finally {
+                    if (deleteAll) {
+                        LOG.warn("Failed to download basic resources for 
topology-id {}", _topologyId);
+                        _fsOps.deleteIfExists(tr);
+                        _fsOps.deleteIfExists(_stormRoot);
+                    }
+                }
+                return null;
+            } catch (Exception e) {
+                LOG.warn("Caught Exception While Downloading (rethrowing)... 
", e);
+                throw e;
+            }
+        }
+    }
+    
+    private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
+
+        public DownloadBaseBlobsLocal(String topologyId) throws IOException {
+            super(topologyId);
+        }
+        
+        @Override
+        protected void downloadBaseBlobs(File tmproot) throws Exception {
+            _fsOps.forceMkdir(tmproot);
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
+            String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+            File codePath = new 
File(ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath()));
+            File confPath = new 
File(ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath()));
+            BlobStore blobStore = Utils.getNimbusBlobStore(_conf, null);
+            try {
+                try (OutputStream codeOutStream = 
_fsOps.getOutputStream(codePath)){
+                    blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
+                }
+                try (OutputStream confOutStream = 
_fsOps.getOutputStream(confPath)) {
+                    blobStore.readBlobTo(stormConfKey, confOutStream, null);
+                }
+            } finally {
+                blobStore.shutdown();
+            }
+
+            ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
+            String resourcesJar = AsyncLocalizer.resourcesJar();
+            URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
+
+            String targetDir = tmproot + Utils.FILE_PATH_SEPARATOR + 
ConfigUtils.RESOURCES_SUBDIR;
+
+            if (resourcesJar != null) {
+                LOG.info("Extracting resources from jar at {} to {}", 
resourcesJar, targetDir);
+                Utils.extractDirFromJar(resourcesJar, 
ConfigUtils.RESOURCES_SUBDIR, _stormRoot);
+            } else if (url != null) {
+                LOG.info("Copying resources at {} to {} ", url.toString(), 
targetDir);
+                if ("jar".equals(url.getProtocol())) {
+                    JarURLConnection urlConnection = (JarURLConnection) 
url.openConnection();
+                    
Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), 
ConfigUtils.RESOURCES_SUBDIR, _stormRoot);
+                } else {
+                    _fsOps.copyDirectory(new File(url.getFile()), new 
File(targetDir));
+                }
+            }
+        }
+    }
+    
+    private class DownloadBlobs implements Callable<Void> {
+        private final String _topologyId;
+
+        public DownloadBlobs(String topologyId) {
+            _topologyId = topologyId;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            try {
+                String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
+                Map<String, Object> stormConf = 
ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+
+                @SuppressWarnings("unchecked")
+                Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+                String user = (String) 
stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+
+                List<LocalResource> localResourceList = new ArrayList<>();
+                if (blobstoreMap != null) {
+                    List<LocalResource> tmp = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+                    if (tmp != null) {
+                        localResourceList.addAll(tmp);
+                    }
+                }
+
+                StormTopology stormCode = 
ConfigUtils.readSupervisorTopology(_conf, _topologyId, _fsOps);
+                List<String> dependencies = new ArrayList<>();
+                if (stormCode.is_set_dependency_jars()) {
+                    dependencies.addAll(stormCode.get_dependency_jars());
+                }
+                if (stormCode.is_set_dependency_artifacts()) {
+                    dependencies.addAll(stormCode.get_dependency_artifacts());
+                }
+                for (String dependency : dependencies) {
+                    localResourceList.add(new LocalResource(dependency, 
false));
+                }
+
+                if (!localResourceList.isEmpty()) {
+                    File userDir = _localizer.getLocalUserFileCacheDir(user);
+                    if (!_fsOps.fileExists(userDir)) {
+                        _fsOps.forceMkdir(userDir);
+                    }
+                    List<LocalizedResource> localizedResources = 
_localizer.getBlobs(localResourceList, user, topoName, userDir);
+                    _fsOps.setupBlobPermissions(userDir, user);
+                    for (LocalizedResource localizedResource : 
localizedResources) {
+                        String keyName = localizedResource.getKey();
+                        //The sym link we are pointing to
+                        File rsrcFilePath = new 
File(localizedResource.getCurrentSymlinkPath());
+
+                        String symlinkName = null;
+                        if (blobstoreMap != null) {
+                            Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
+                            if (blobInfo != null && 
blobInfo.containsKey("localname")) {
+                                symlinkName = (String) 
blobInfo.get("localname");
+                            } else {
+                                symlinkName = keyName;
+                            }
+                        } else {
+                            // all things are from dependencies
+                            symlinkName = keyName;
+                        }
+                        _fsOps.createSymlink(new File(stormroot, symlinkName), 
rsrcFilePath);
+                    }
+                }
+
+                return null;
+            } catch (Exception e) {
+                LOG.warn("Caught Exception While Downloading (rethrowing)... 
", e);
+                throw e;
+            }
+        }
+    }
+    
+    //Visible for testing
+    AsyncLocalizer(Map<String, Object> conf, Localizer localizer, 
AdvancedFSOps ops) {
+        _conf = conf;
+        _isLocalMode = ConfigUtils.isLocalMode(conf);
+        _localizer = localizer;
+        _execService = Executors.newFixedThreadPool(1,  
+                new ThreadFactoryBuilder()
+                .setNameFormat("Async Localizer")
+                .build());
+        _basicPending = new HashMap<>();
+        _blobPending = new HashMap<>();
+        _fsOps = ops;
+    }
+    
+    public AsyncLocalizer(Map<String, Object> conf, Localizer localizer) {
+        this(conf, localizer, AdvancedFSOps.make(conf));
+    }
+
+    @Override
+    public synchronized Future<Void> requestDownloadBaseTopologyBlobs(final 
LocalAssignment assignment, final int port) throws IOException {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _basicPending.get(topologyId);
+        if (localResource == null) {
+            Callable<Void> c;
+            if (_isLocalMode) {
+                c = new DownloadBaseBlobsLocal(topologyId);
+            } else {
+                c = new DownloadBaseBlobsDistributed(topologyId);
+            }
+            localResource = new 
LocalDownloadedResource(_execService.submit(c));
+            _basicPending.put(topologyId, localResource);
+        }
+        Future<Void> ret = localResource.reserve(port, assignment);
+        LOG.debug("Reserved basic {} {}", topologyId, localResource);
+        return ret;
+    }
+
+    private static String resourcesJar() throws IOException {
+        String path = Utils.currentClasspath();
+        if (path == null) {
+            return null;
+        }
+        
+        for (String jpath : path.split(File.pathSeparator)) {
+            if (jpath.endsWith(".jar")) {
+                if (Utils.zipDoesContainDir(jpath, 
ConfigUtils.RESOURCES_SUBDIR)) {
+                    return jpath;
+                }
+            }
+        }
+        return null;
+    }
+    
+    @Override
+    public synchronized void recoverRunningTopology(LocalAssignment 
assignment, int port) {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _basicPending.get(topologyId);
+        if (localResource == null) {
+            localResource = new LocalDownloadedResource(new AllDoneFuture());
+            _basicPending.put(topologyId, localResource);
+        }
+        localResource.reserve(port, assignment);
+        LOG.debug("Recovered basic {} {}", topologyId, localResource);
+        
+        localResource = _blobPending.get(topologyId);
+        if (localResource == null) {
+            localResource = new LocalDownloadedResource(new AllDoneFuture());
+            _blobPending.put(topologyId, localResource);
+        }
+        localResource.reserve(port, assignment);
+        LOG.debug("Recovered blobs {} {}", topologyId, localResource);
+    }
+    
+    @Override
+    public synchronized Future<Void> 
requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _blobPending.get(topologyId);
+        if (localResource == null) {
+            Callable<Void> c = new DownloadBlobs(topologyId);
+            localResource = new 
LocalDownloadedResource(_execService.submit(c));
+            _blobPending.put(topologyId, localResource);
+        }
+        Future<Void> ret = localResource.reserve(port, assignment);
+        LOG.debug("Reserved blobs {} {}", topologyId, localResource);
+        return ret;
+    }
+
+    @Override
+    public synchronized void releaseSlotFor(LocalAssignment assignment, int 
port) throws IOException {
+        final String topologyId = assignment.get_topology_id();
+        LOG.debug("Releasing slot for {} {}", topologyId, port);
+        LocalDownloadedResource localResource = _blobPending.get(topologyId);
+        if (localResource == null || !localResource.release(port, assignment)) 
{
+            LOG.warn("Released blob reference {} {} for something that we 
didn't have {}", topologyId, port, localResource);
+        } else if (localResource.isDone()){
+            LOG.info("Released blob reference {} {} Cleaning up BLOB 
references...", topologyId, port);
+            _blobPending.remove(topologyId);
+            Map<String, Object> topoConf = 
ConfigUtils.readSupervisorStormConf(_conf, topologyId);
+            @SuppressWarnings("unchecked")
+            Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+            if (blobstoreMap != null) {
+                String user = (String) 
topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
+                
+                for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
+                    String key = entry.getKey();
+                    Map<String, Object> blobInfo = entry.getValue();
+                    try {
+                        _localizer.removeBlobReference(key, user, topoName, 
SupervisorUtils.shouldUncompressBlob(blobInfo));
+                    } catch (Exception e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+        } else {
+            LOG.debug("Released blob reference {} {} still waiting on {}", 
topologyId, port, localResource);
+        }
+        
+        localResource = _basicPending.get(topologyId);
+        if (localResource == null || !localResource.release(port, assignment)) 
{
+            LOG.warn("Released basic reference {} {} for something that we 
didn't have {}", topologyId, port, localResource);
+        } else if (localResource.isDone()){
+            LOG.info("Released blob reference {} {} Cleaning up basic 
files...", topologyId, port);
+            _basicPending.remove(topologyId);
+            String path = ConfigUtils.supervisorStormDistRoot(_conf, 
topologyId);
+            _fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId);
+        } else {
+            LOG.debug("Released basic reference {} {} still waiting on {}", 
topologyId, port, localResource);
+        }
+    }
+
+    @Override
+    public synchronized void cleanupUnusedTopologies() throws IOException {
+        File distRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf));
+        LOG.info("Cleaning up unused topologies in {}", distRoot);
+        File[] children = distRoot.listFiles();
+        if (children != null) {
+            for (File topoDir : children) {
+                String topoId = URLDecoder.decode(topoDir.getName(), "UTF-8");
+                if (_basicPending.get(topoId) == null && 
_blobPending.get(topoId) == null) {
+                    _fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        _execService.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java 
b/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
new file mode 100644
index 0000000..7105095
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+
+import org.apache.storm.generated.LocalAssignment;
+
+/**
+ * Download blobs from the blob store and keep them up to date.
+ */
+public interface ILocalizer {
+
+    /**
+     * Recover a running topology by incrementing references for what it has 
already downloaded.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running in.
+     */
+    void recoverRunningTopology(LocalAssignment assignemnt, int port);
+    
+    /**
+     * Download storm.jar, storm.conf, and storm.ser for this topology if not 
done so already,
+     * and inc a reference count on them.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @return a future to let you know when they are done.
+     * @throws IOException on error 
+     */
+    Future<Void> requestDownloadBaseTopologyBlobs(LocalAssignment assignment, 
int port) throws IOException;
+
+    /**
+     * Download the blobs for this topology (reading in list in from the 
config)
+     * and inc reference count for these blobs.
+     * PRECONDITION: requestDownloadBaseTopologyBlobs has completed 
downloading.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @return a future to let you know when they are done.
+     */
+    Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int 
port);
+    
+    /**
+     * dec reference count on all blobs associated with this topology.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @throws IOException on any error
+     */
+    void releaseSlotFor(LocalAssignment assignment, int port) throws 
IOException;
+    
+    /**
+     * Clean up any topologies that are not in use right now.
+     * @throws IOException on any error.
+     */
+    void cleanupUnusedTopologies() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java 
b/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
new file mode 100644
index 0000000..9e91a93
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.storm.generated.LocalAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalDownloadedResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocalDownloadedResource.class);
+    private static class NoCancelFuture<T> implements Future<T> {
+        private final Future<T> _wrapped;
+        
+        public NoCancelFuture(Future<T> wrapped) {
+            _wrapped = wrapped;
+        }
+        
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            //cancel not currently supported
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return _wrapped.isDone();
+        }
+
+        @Override
+        public T get() throws InterruptedException, ExecutionException {
+            return _wrapped.get();
+        }
+
+        @Override
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+            return _wrapped.get(timeout, unit);
+        }
+    }
+    private static class PortNAssignment {
+        private final int _port;
+        private final LocalAssignment _assignment;
+        
+        public PortNAssignment(int port, LocalAssignment assignment) {
+            _port = port;
+            _assignment = assignment;
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof PortNAssignment)) {
+                return false;
+            }
+            PortNAssignment pna = (PortNAssignment) other;
+            return pna._port == _port && _assignment.equals(pna._assignment); 
+        }
+        
+        @Override
+        public int hashCode() {
+            return (17 * _port) + _assignment.hashCode();
+        }
+        
+        @Override
+        public String toString() {
+            return "{"+ _port + " " + _assignment +"}";
+        }
+    }
+    private final Future<Void> _pending;
+    private final Set<PortNAssignment> _references;
+    private boolean _isDone;
+    
+    
+    public LocalDownloadedResource(Future<Void> pending) {
+        _pending = new NoCancelFuture<>(pending);
+        _references = new HashSet<>();
+        _isDone = false;
+    }
+
+    /**
+     * Reserve the resources
+     * @param port the port this is for
+     * @param la the assignment this is for
+     * @return a future that can be used to track it being downloaded.
+     */
+    public synchronized Future<Void> reserve(int port, LocalAssignment la) {
+        PortNAssignment pna = new PortNAssignment(port, la);
+        if (!_references.add(pna)) {
+            LOG.warn("Resources {} already reserved {} for this topology", 
pna, _references);
+        }
+        return _pending;
+    }
+    
+    /**
+     * Release a port from the reference count, and update isDone if all is 
done.
+     * @param port the port to release
+     * @param la the assignment to release
+     * @return true if the port was being counted else false
+     */
+    public synchronized boolean release(int port, LocalAssignment la) {
+        PortNAssignment pna = new PortNAssignment(port, la);
+        boolean ret = _references.remove(pna);
+        if (ret && _references.isEmpty()) {
+            _isDone = true;
+        }
+        return ret;
+    }
+    
+    /**
+     * Is this has been cleaned up completely.
+     * @return true if it is done else false
+     */
+    public synchronized boolean isDone() {
+        return _isDone;
+    }
+
+    @Override
+    public String toString() {
+        return _references.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
 
b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 380e777..9f42b47 100644
--- 
a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++ 
b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -93,7 +93,7 @@ public class LocalizedResourceRetentionSet {
           i.remove();
         } else {
           // since it failed to delete add it back so it gets retried
-          set.addResource(resource.getKey(), resource, 
resource.isUncompressed());
+          set.add(resource.getKey(), resource, resource.isUncompressed());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java 
b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
index b5f00c3..62d5b2f 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
@@ -57,7 +57,7 @@ public class LocalizedResourceSet {
     return _localrsrcFiles.get(name);
   }
 
-  public void updateResource(String resourceName, LocalizedResource 
updatedResource,
+  public void putIfAbsent(String resourceName, LocalizedResource 
updatedResource,
                             boolean uncompress) {
     if (uncompress) {
       _localrsrcArchives.putIfAbsent(resourceName, updatedResource);
@@ -66,7 +66,7 @@ public class LocalizedResourceSet {
     }
   }
 
-  public void addResource(String resourceName, LocalizedResource newResource, 
boolean uncompress) {
+  public void add(String resourceName, LocalizedResource newResource, boolean 
uncompress) {
     if (uncompress) {
       _localrsrcArchives.put(resourceName, newResource);
     } else {
@@ -76,9 +76,9 @@ public class LocalizedResourceSet {
 
   public boolean exists(String resourceName, boolean uncompress) {
     if (uncompress) {
-      return (_localrsrcArchives.get(resourceName) != null);
+      return _localrsrcArchives.containsKey(resourceName);
     }
-    return (_localrsrcFiles.get(resourceName) != null);
+    return _localrsrcFiles.containsKey(resourceName);
   }
 
   public boolean remove(LocalizedResource resource) {

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java 
b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
index b91cecb..0135397 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
@@ -63,20 +63,6 @@ import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
  */
 public class Localizer {
   public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
-
-  private Map _conf;
-  private int _threadPoolSize;
-  // thread pool for initial download
-  private ExecutorService _execService;
-  // thread pool for updates
-  private ExecutorService _updateExecService;
-  private int _blobDownloadRetries;
-
-  // track resources - user to resourceSet
-  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
-      ConcurrentHashMap<String, LocalizedResourceSet>();
-
-  private String _localBaseDir;
   public static final String USERCACHE = "usercache";
   public static final String FILECACHE = "filecache";
 
@@ -85,13 +71,29 @@ public class Localizer {
   public static final String ARCHIVESDIR = "archives";
 
   private static final String TO_UNCOMPRESS = "_tmp_";
+  
+  
+  
+  private final Map<String, Object> _conf;
+  private final int _threadPoolSize;
+  // thread pool for initial download
+  private final ExecutorService _execService;
+  // thread pool for updates
+  private final ExecutorService _updateExecService;
+  private final int _blobDownloadRetries;
+
+  // track resources - user to resourceSet
+  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
+      ConcurrentHashMap<String, LocalizedResourceSet>();
+
+  private final String _localBaseDir;
 
   // cleanup
   private long _cacheTargetSize;
   private long _cacheCleanupPeriod;
   private ScheduledExecutorService _cacheCleanupService;
 
-  public Localizer(Map conf, String baseDir) {
+  public Localizer(Map<String, Object> conf, String baseDir) {
     _conf = conf;
     _localBaseDir = baseDir;
     // default cache size 10GB, converted to Bytes
@@ -189,7 +191,7 @@ public class Localizer {
         LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
         LocalizedResource lrsrc = new LocalizedResource(new 
File(path).getName(), path,
             uncompress);
-        lrsrcSet.addResource(lrsrc.getKey(), lrsrc, uncompress);
+        lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
       }
     }
   }
@@ -369,7 +371,7 @@ public class Localizer {
           if (newlrsrcSet == null) {
             newlrsrcSet = newSet;
           }
-          newlrsrcSet.updateResource(lrsrc.getKey(), lrsrc, 
lrsrc.isUncompressed());
+          newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, 
lrsrc.isUncompressed());
           results.add(lrsrc);
         }
         catch (ExecutionException e) {
@@ -451,7 +453,7 @@ public class Localizer {
       for (Future<LocalizedResource> futureRsrc: futures) {
         LocalizedResource lrsrc = futureRsrc.get();
         lrsrc.addReference(topo);
-        lrsrcSet.addResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+        lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
         results.add(lrsrc);
       }
     } catch (ExecutionException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java 
b/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java
index 7a54d96..4a76012 100644
--- a/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java
@@ -26,6 +26,7 @@ import org.apache.storm.multilang.BoltMsg;
 import org.apache.storm.multilang.ShellMsg;
 import org.apache.storm.topology.ReportedFailedException;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ShellBoltMessageQueue;
 import org.apache.storm.utils.ShellProcess;
 import clojure.lang.RT;
@@ -90,6 +91,7 @@ public class ShellBolt implements IBolt {
     private ScheduledExecutorService heartBeatExecutorService;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
     private AtomicBoolean sendHeartbeatFlag = new AtomicBoolean(false);
+    private boolean _isLocalMode = false;
 
     public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
@@ -106,6 +108,9 @@ public class ShellBolt implements IBolt {
 
     public void prepare(Map stormConf, TopologyContext context,
                         final OutputCollector collector) {
+        if (ConfigUtils.isLocalMode(stormConf)) {
+            _isLocalMode = true;
+        }
         Object maxPending = 
stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
         if (maxPending != null) {
             this._pendingWrites = new 
ShellBoltMessageQueue(((Number)maxPending).intValue());
@@ -298,7 +303,7 @@ public class ShellBolt implements IBolt {
                 processInfo);
         LOG.error(message, exception);
         _collector.reportError(exception);
-        if (_running || (exception instanceof Error)) { //don't exit if not 
running, unless it is an Error
+        if (!_isLocalMode && (_running || (exception instanceof Error))) { 
//don't exit if not running, unless it is an Error
             System.exit(11);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java 
b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
index 2f06102..02872d0 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
@@ -17,17 +17,18 @@
  */
 package org.apache.storm.testing;
 
-import org.apache.storm.topology.OutputFieldsDeclarer;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.InprocMessaging;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
 
 
 public class FeederSpout extends BaseRichSpout {
@@ -51,7 +52,15 @@ public class FeederSpout extends BaseRichSpout {
 
     public void feed(List<Object> tuple, Object msgId) {
         InprocMessaging.sendMessage(_id, new Values(tuple, msgId));
-    }    
+    }
+    
+    public void feedNoWait(List<Object> tuple, Object msgId) {
+        InprocMessaging.sendMessageNoWait(_id, new Values(tuple, msgId));
+    }
+    
+    public void waitForReader() {
+        InprocMessaging.waitForReader(_id);
+    }
     
     public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
         _collector = collector;
@@ -63,17 +72,11 @@ public class FeederSpout extends BaseRichSpout {
 
     public void nextTuple() {
         List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id);
-        if(toEmit!=null) {
+        if (toEmit!=null) {
             List<Object> tuple = (List<Object>) toEmit.get(0);
             Object msgId = toEmit.get(1);
             
             _collector.emit(tuple, msgId);
-        } else {
-            try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
         }
     }
 
@@ -97,4 +100,4 @@ public class FeederSpout extends BaseRichSpout {
     public Map<String, Object> getComponentConfiguration() {
         return new HashMap<String, Object>();
     }    
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java 
b/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java
index 8272b3c..f5d317e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java
@@ -111,7 +111,7 @@ public class TridentUtils {
         return Utils.thriftSerialize(t);
     }
 
-    public static <T> T thriftDeserialize(Class c, byte[] b) {
+    public static <T> T thriftDeserialize(Class<T> c, byte[] b) {
         return Utils.thriftDeserialize(c,b);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java 
b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index a244f6a..e2be8a7 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -20,20 +20,16 @@ package org.apache.storm.utils;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.validation.ConfigValidation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
 import java.lang.reflect.Field;
 import java.net.URLEncoder;
 import java.util.ArrayList;
@@ -116,7 +112,7 @@ public class ConfigUtils {
         return mode;
     }
 
-    public static boolean isLocalMode(Map conf) {
+    public static boolean isLocalMode(Map<String, Object> conf) {
         String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
         if (mode != null) {
             if ("local".equals(mode)) {
@@ -125,8 +121,9 @@ public class ConfigUtils {
             if ("distributed".equals(mode)) {
                 return false;
             }
+            throw new IllegalArgumentException("Illegal cluster mode in conf: 
" + mode);
         }
-        throw new IllegalArgumentException("Illegal cluster mode in conf: " + 
mode);
+        return true;
     }
 
     public static int samplingRate(Map conf) {
@@ -161,12 +158,12 @@ public class ConfigUtils {
     }
 
     // we use this "weird" wrapper pattern temporarily for mocking in clojure 
test
-    public static Map readStormConfig() {
+    public static Map<String, Object> readStormConfig() {
         return _instance.readStormConfigImpl();
     }
 
-    public Map readStormConfigImpl() {
-        Map conf = Utils.readStormConfig();
+    public Map<String, Object> readStormConfigImpl() {
+        Map<String, Object> conf = Utils.readStormConfig();
         ConfigValidation.validateFields(conf);
         return conf;
     }
@@ -246,14 +243,14 @@ public class ConfigUtils {
         return ret + FILE_SEPARATOR + "stormdist";
     }
 
-    public static Map readSupervisorStormConfGivenPath(Map conf, String 
stormConfPath) throws IOException {
-        Map ret = new HashMap(conf);
+    public static Map<String, Object> 
readSupervisorStormConfGivenPath(Map<String, Object> conf, String 
stormConfPath) throws IOException {
+        Map<String, Object> ret = new HashMap<>(conf);
         
ret.putAll(Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new 
File(stormConfPath))));
         return ret;
     }
 
-    public static StormTopology readSupervisorStormCodeGivenPath(String 
stormCodePath) throws IOException {
-        return Utils.deserialize(FileUtils.readFileToByteArray(new 
File(stormCodePath)), StormTopology.class);
+    public static StormTopology readSupervisorStormCodeGivenPath(String 
stormCodePath, AdvancedFSOps ops) throws IOException {
+        return Utils.deserialize(ops.slurp(new File(stormCodePath)), 
StormTopology.class);
     }
 
     public static String masterStormJarPath(String stormRoot) {
@@ -353,25 +350,24 @@ public class ConfigUtils {
     }
 
     // we use this "weird" wrapper pattern temporarily for mocking in clojure 
test
-    public static Map readSupervisorStormConf(Map conf, String stormId) throws 
IOException {
+    public static Map<String, Object> readSupervisorStormConf(Map<String, 
Object> conf, String stormId) throws IOException {
         return _instance.readSupervisorStormConfImpl(conf, stormId);
     }
 
-    public Map readSupervisorStormConfImpl(Map conf, String stormId) throws 
IOException {
+    public Map<String, Object> readSupervisorStormConfImpl(Map<String, Object> 
conf, String stormId) throws IOException {
         String stormRoot = supervisorStormDistRoot(conf, stormId);
         String confPath = supervisorStormConfPath(stormRoot);
         return readSupervisorStormConfGivenPath(conf, confPath);
     }
 
-    // we use this "weird" wrapper pattern temporarily for mocking in clojure 
test
-    public static StormTopology readSupervisorTopology(Map conf, String 
stormId) throws IOException {
-        return _instance.readSupervisorTopologyImpl(conf, stormId);
+    public static StormTopology readSupervisorTopology(Map conf, String 
stormId, AdvancedFSOps ops) throws IOException {
+        return _instance.readSupervisorTopologyImpl(conf, stormId, ops);
     }
-
-    public StormTopology readSupervisorTopologyImpl(Map conf, String stormId) 
throws IOException {
+  
+    public StormTopology readSupervisorTopologyImpl(Map conf, String stormId, 
AdvancedFSOps ops) throws IOException {
         String stormRoot = supervisorStormDistRoot(conf, stormId);
         String topologyPath = supervisorStormCodePath(stormRoot);
-        return readSupervisorStormCodeGivenPath(topologyPath);
+        return readSupervisorStormCodeGivenPath(topologyPath, ops);
     }
 
     public static String workerUserRoot(Map conf) {
@@ -382,27 +378,6 @@ public class ConfigUtils {
         return (workerUserRoot(conf) + FILE_SEPARATOR + workerId);
     }
 
-    public static String getWorkerUser(Map conf, String workerId) {
-        LOG.info("GET worker-user for {}", workerId);
-        File file = new File(workerUserFile(conf, workerId));
-
-        try (InputStream in = new FileInputStream(file);
-             Reader reader = new InputStreamReader(in);
-             BufferedReader br = new BufferedReader(reader);) {
-            StringBuilder sb = new StringBuilder();
-            int r;
-            while ((r = br.read()) != -1) {
-                char ch = (char) r;
-                sb.append(ch);
-            }
-            String ret = sb.toString().trim();
-            return ret;
-        } catch (IOException e) {
-            LOG.error("Failed to get worker user for {}.", workerId);
-            return null;
-        }
-    }
-
     public static String getIdFromBlobKey(String key) {
         if (key == null) return null;
         final String STORM_JAR_SUFFIX = "-stormjar.jar";
@@ -421,27 +396,6 @@ public class ConfigUtils {
     }
 
     // we use this "weird" wrapper pattern temporarily for mocking in clojure 
test
-    public static void setWorkerUserWSE(Map conf, String workerId, String 
user) throws IOException {
-        _instance.setWorkerUserWSEImpl(conf, workerId, user);
-    }
-
-    public void setWorkerUserWSEImpl(Map conf, String workerId, String user) 
throws IOException {
-        LOG.info("SET worker-user {} {}", workerId, user);
-        File file = new File(workerUserFile(conf, workerId));
-        file.getParentFile().mkdirs();
-
-        try (FileWriter fw = new FileWriter(file);
-             BufferedWriter writer = new BufferedWriter(fw);) {
-            writer.write(user);
-        }
-    }
-
-    public static void removeWorkerUserWSE(Map conf, String workerId) {
-        LOG.info("REMOVE worker-user {}", workerId);
-        new File(workerUserFile(conf, workerId)).delete();
-    }
-
-    // we use this "weird" wrapper pattern temporarily for mocking in clojure 
test
     public static String workerArtifactsRoot(Map conf) {
         return _instance.workerArtifactsRootImpl(conf);
     }
@@ -512,6 +466,10 @@ public class ConfigUtils {
     public static String workerPidPath(Map conf, String id, String pid) {
         return (workerPidsRoot(conf, id) + FILE_SEPARATOR + pid);
     }
+    
+    public static String workerPidPath(Map<String, Object> conf, String id, 
long pid) {
+        return workerPidPath(conf, id, String.valueOf(pid));
+    }
 
     public static String workerHeartbeatsRoot(Map conf, String id) {
         return (workerRoot(conf, id) + FILE_SEPARATOR + "heartbeats");

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java 
b/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java
index 51250f4..8583e0d 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java
@@ -19,41 +19,82 @@ package org.apache.storm.utils;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InprocMessaging {
-    private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new 
HashMap<>();
-    private static final Object _lock = new Object();
+    private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new 
HashMap<Integer, LinkedBlockingQueue<Object>>();
+    private static ConcurrentMap<Integer, AtomicBoolean> _hasReader = new 
ConcurrentHashMap<>();
     private static int port = 1;
+    private static final Logger LOG = 
LoggerFactory.getLogger(InprocMessaging.class);
     
-    public static int acquireNewPort() {
-        int ret;
-        synchronized(_lock) {
-            ret = port;
-            port++;
-        }
+    public synchronized static int acquireNewPort() {
+        int ret = port;
+        port++;
         return ret;
     }
     
     public static void sendMessage(int port, Object msg) {
+        waitForReader(port);
+        getQueue(port).add(msg);
+    }
+    
+    public static void sendMessageNoWait(int port, Object msg) {
         getQueue(port).add(msg);
     }
     
     public static Object takeMessage(int port) throws InterruptedException {
+        readerArrived(port);
         return getQueue(port).take();
     }
 
     public static Object pollMessage(int port) {
+        readerArrived(port);
         return  getQueue(port).poll();
-    }    
+    }
+    
+    private static AtomicBoolean getHasReader(int port) {
+        AtomicBoolean ab = _hasReader.get(port);
+        if (ab == null) {
+            _hasReader.putIfAbsent(port, new AtomicBoolean(false));
+            ab = _hasReader.get(port);
+        }
+        return ab;
+    }
     
-    private static LinkedBlockingQueue<Object> getQueue(int port) {
-        synchronized(_lock) {
-            if(!_queues.containsKey(port)) {
-              _queues.put(port, new LinkedBlockingQueue<>());
+    public static void waitForReader(int port) {
+        AtomicBoolean ab = getHasReader(port);
+        long start = Time.currentTimeMillis();
+        while (!ab.get()) {
+            if (Time.isSimulating()) {
+                Time.advanceTime(100);
+            }
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                //Ignored
             }
-            return _queues.get(port);
+            if (Time.currentTimeMillis() - start > 20000) {
+                LOG.error("DONE WAITING FOR READER AFTER {} ms", 
Time.currentTimeMillis() - start);
+                break;
+            }
+        }
+    }
+    
+    private static void readerArrived(int port) {
+        getHasReader(port).set(true);
+    }
+    
+    private synchronized static LinkedBlockingQueue<Object> getQueue(int port) 
{
+        if(!_queues.containsKey(port)) {
+            _queues.put(port, new LinkedBlockingQueue<Object>());
         }
+        return _queues.get(port);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java 
b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index 1b36070..9b656ee 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -29,6 +29,7 @@ public class Time {
     public static final Logger LOG = LoggerFactory.getLogger(Time.class);
     
     private static AtomicBoolean simulating = new AtomicBoolean(false);
+    private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
     //TODO: should probably use weak references here or something
     private static volatile Map<Thread, AtomicLong> threadSleepTimes;
     private static final Object sleepTimesLock = new Object();
@@ -43,10 +44,18 @@ public class Time {
         }
     }
     
+    public static void startSimulatingAutoAdvanceOnSleep(long ms) {
+        synchronized(sleepTimesLock) {
+            startSimulating();
+            autoAdvanceOnSleep.set(ms);
+        }
+    }
+    
     public static void stopSimulating() {
         synchronized(sleepTimesLock) {
-            simulating.set(false);             
-            threadSleepTimes = null;  
+            simulating.set(false);    
+            autoAdvanceOnSleep.set(0);
+            threadSleepTimes = null;
         }
     }
     
@@ -71,6 +80,10 @@ public class Time {
                             throw new InterruptedException();
                         }
                     }
+                    long autoAdvance = autoAdvanceOnSleep.get();
+                    if (autoAdvance > 0) {
+                        advanceTime(autoAdvance);
+                    }
                     Thread.sleep(10);
                 }
             } finally {
@@ -126,9 +139,10 @@ public class Time {
     }
     
     public static void advanceTime(long ms) {
-        if(!simulating.get()) throw new IllegalStateException("Cannot simulate 
time unless in simulation mode");
-        if(ms < 0) throw new IllegalArgumentException("advanceTime only 
accepts positive time as an argument");
-        simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms);
+        if (!simulating.get()) throw new IllegalStateException("Cannot 
simulate time unless in simulation mode");
+        if (ms < 0) throw new IllegalArgumentException("advanceTime only 
accepts positive time as an argument");
+        long newTime = simulatedCurrTimeMs.addAndGet(ms);
+        LOG.debug("Advanced simulated time to {}", newTime);
     }
     
     public static boolean isThreadWaiting(Thread t) {
@@ -138,5 +152,5 @@ public class Time {
             time = threadSleepTimes.get(t);
         }
         return !t.isAlive() || time!=null && currentTimeMillis() < 
time.longValue();
-    }    
+    }
 }

Reply via email to