http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
deleted file mode 100644
index 53436ae..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.Config;
-import org.apache.storm.container.cgroup.CgroupManager;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
-import org.apache.storm.generated.ExecutorInfo;
-import org.apache.storm.generated.LSWorkerHeartbeat;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.LocalState;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-import org.eclipse.jetty.util.ConcurrentHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.*;
-
-/**
- * 1. to kill are those in allocated that are dead or disallowed
- * 2. kill the ones that should be dead - read pids, kill -9 and individually 
remove file - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception 
and log)
- * 3. remove any downloaded code that's no longer assigned to this supervisor
- * 4. of the rest, figure out what assignments aren't yet satisfied
- * 5. generate new worker ids, write new "approved workers" to LS
- * 6. create local dir for worker id
- * 7. launch new workers (give worker-id, port, and supervisor-id)
- * 8. wait for workers launch
- */
-public class SyncProcessEvent implements Runnable {
-
-    private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
-
-    private  LocalState localState;
-    private  SupervisorData supervisorData;
-    public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
-
-    private class ProcessExitCallback implements Utils.ExitCodeCallable {
-        private final String logPrefix;
-        private final String workerId;
-
-        public ProcessExitCallback(String logPrefix, String workerId) {
-            this.logPrefix = logPrefix;
-            this.workerId = workerId;
-        }
-
-        @Override
-        public Object call() throws Exception {
-            return null;
-        }
-
-        @Override
-        public Object call(int exitCode) {
-            LOG.info("{} exited with code: {}", logPrefix, exitCode);
-            supervisorData.getDeadWorkers().add(workerId);
-            return null;
-        }
-    }
-
-    public SyncProcessEvent(){
-
-    }
-    public SyncProcessEvent(SupervisorData supervisorData) {
-        init(supervisorData);
-    }
-    
-    public void init(SupervisorData supervisorData){
-        this.supervisorData = supervisorData;
-        this.localState = supervisorData.getLocalState();
-    }
-
-    @Override
-    public void run() {
-        LOG.debug("Syncing processes");
-        try {
-            Map conf = supervisorData.getConf();
-            Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
-
-            if (assignedExecutors == null) {
-                assignedExecutors = new HashMap<>();
-            }
-
-            Set<String> assignedStormIds = new HashSet<>();
-            for (Map.Entry<Integer, LocalAssignment> entry : 
assignedExecutors.entrySet()) {
-                assignedStormIds.add(entry.getValue().get_topology_id());
-            }
-
-            int now = Time.currentTimeSecs();
-
-            Map<String, StateHeartbeat> localWorkerStats = 
getLocalWorkerStats(supervisorData, assignedExecutors, now);
-
-            Set<String> keeperWorkerIds = new HashSet<>();
-            Set<Integer> keepPorts = new HashSet<>();
-            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
-                StateHeartbeat stateHeartbeat = entry.getValue();
-                if (stateHeartbeat.getState() == State.VALID) {
-                    keeperWorkerIds.add(entry.getKey());
-                    keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
-                }
-            }
-            Map<Integer, LocalAssignment> reassignExecutors = 
getReassignExecutors(assignedExecutors, keepPorts);
-            Map<Integer, String> newWorkerIds = new HashMap<>();
-            for (Integer port : reassignExecutors.keySet()) {
-                newWorkerIds.put(port, Utils.uuid());
-            }
-            Set<String> allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
-
-            LOG.debug("Assigned executors: {}", assignedExecutors);
-            LOG.debug("Allocated: {}", localWorkerStats);
-
-            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
-                StateHeartbeat stateHeartbeat = entry.getValue();
-                if (stateHeartbeat.getState() != State.VALID) {
-                    LOG.info("Shutting down and clearing state for id {}, 
Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
-                            stateHeartbeat.getState(), 
stateHeartbeat.getHeartbeat());
-                    killWorker(supervisorData, 
supervisorData.getWorkerManager(), entry.getKey());
-                }
-            }
-
-            // remove any downloaded code that's no longer assigned or active
-            for (String downloadedTopologyId : allDownloadedTopologyIds) {
-                if (!assignedStormIds.contains(downloadedTopologyId)) {
-                    LOG.info("Removing code for storm id {}.", 
downloadedTopologyId);
-                    SupervisorUtils.rmTopoFiles(conf, downloadedTopologyId, 
supervisorData.getLocalizer(), true);
-                }
-            }
-
-            // start new workers
-            Map<String, Integer> newWorkerPortToIds = 
startNewWorkers(newWorkerIds, reassignExecutors);
-
-            Map<String, Integer> allWorkerPortToIds = new HashMap<>();
-            Map<String, Integer> approvedWorkers = 
localState.getApprovedWorkers();
-            for (String keeper : keeperWorkerIds) {
-                allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper));
-            }
-            allWorkerPortToIds.putAll(newWorkerPortToIds);
-            localState.setApprovedWorkers(allWorkerPortToIds);
-            waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());
-
-        } catch (Exception e) {
-            LOG.error("Failed Sync Process", e);
-            throw Utils.wrapInRuntime(e);
-        }
-
-    }
-
-    protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) 
throws Exception {
-        int startTime = Time.currentTimeSecs();
-        int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
-        for (String workerId : workerIds) {
-            LocalState localState = ConfigUtils.workerState(conf, workerId);
-            while (true) {
-                LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
-                if (hb != null || (Time.currentTimeSecs() - startTime) > 
timeOut)
-                    break;
-                LOG.info("{} still hasn't started", workerId);
-                Time.sleep(500);
-            }
-            if (localState.getWorkerHeartBeat() == null) {
-                LOG.info("Worker {} failed to start", workerId);
-            }
-        }
-    }
-
-    protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, 
LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
-        Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
-        reassignExecutors.putAll(assignExecutors);
-        for (Integer port : keepPorts) {
-            reassignExecutors.remove(port);
-        }
-        return reassignExecutors;
-    }
-    
-    /**
-     * Returns map from worker id to worker heartbeat. if the heartbeat is 
nil, then the worker is dead
-     * 
-     * @param assignedExecutors
-     * @return
-     * @throws Exception
-     */
-    public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData 
supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) 
throws Exception {
-        Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
-        Map conf = supervisorData.getConf();
-        LocalState localState = supervisorData.getLocalState();
-        Map<String, LSWorkerHeartbeat> idToHeartbeat = 
SupervisorUtils.readWorkerHeartbeats(conf);
-        Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
-        Set<String> approvedIds = new HashSet<>();
-        if (approvedWorkers != null) {
-            approvedIds.addAll(approvedWorkers.keySet());
-        }
-        for (Map.Entry<String, LSWorkerHeartbeat> entry : 
idToHeartbeat.entrySet()) {
-            String workerId = entry.getKey();
-            LSWorkerHeartbeat whb = entry.getValue();
-            State state;
-            if (whb == null) {
-                state = State.NOT_STARTED;
-            } else if (!approvedIds.contains(workerId) || 
!matchesAssignment(whb, assignedExecutors)) {
-                state = State.DISALLOWED;
-            } else if (supervisorData.getDeadWorkers().contains(workerId)) {
-                LOG.info("Worker Process {} has died", workerId);
-                state = State.TIMED_OUT;
-            } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
-                state = State.TIMED_OUT;
-            } else {
-                state = State.VALID;
-            }
-            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor 
time-secs {}", workerId, state, whb, now);
-            workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
-        }
-        return workerIdHbstate;
-    }
-
-    protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, 
LocalAssignment> assignedExecutors) {
-        LocalAssignment localAssignment = 
assignedExecutors.get(whb.get_port());
-        if (localAssignment == null || 
!localAssignment.get_topology_id().equals(whb.get_topology_id())) {
-            return false;
-        }
-        List<ExecutorInfo> executorInfos = new ArrayList<>();
-        executorInfos.addAll(whb.get_executors());
-        // remove SYSTEM_EXECUTOR_ID
-        executorInfos.remove(SYSTEM_EXECUTOR_INFO);
-        List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
-
-        if (localExecuorInfos.size() != executorInfos.size())
-            return false;
-
-        for (ExecutorInfo executorInfo : localExecuorInfos){
-            if (!localExecuorInfos.contains(executorInfo))
-                return false;
-        }
-        return true;
-    }
-
-    /**
-     * launch a worker in local mode.
-     */
-    protected void launchLocalWorker(SupervisorData supervisorData, String 
stormId, Long port, String workerId, WorkerResources resources) throws 
IOException {
-        // port this function after porting worker to java
-    }
-
-    protected void launchDistributedWorker(IWorkerManager workerManager, Map 
conf, String supervisorId, String assignmentId, String stormId, Long port, 
String workerId,
-                                           WorkerResources resources, 
ConcurrentHashSet deadWorkers) throws IOException {
-        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-        writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
-        ConfigUtils.setWorkerUserWSE(conf, workerId, user);
-        createArtifactsLink(conf, stormId, port, workerId);
-
-        String logPrefix = "Worker Process " + workerId;
-        if (deadWorkers != null)
-            deadWorkers.remove(workerId);
-        createBlobstoreLinks(conf, stormId, workerId);
-        ProcessExitCallback processExitCallback = new 
ProcessExitCallback(logPrefix, workerId);
-        workerManager.launchWorker(supervisorId, assignmentId, stormId, port, 
workerId, resources, processExitCallback);
-    }
-
-    protected Map<String, Integer> startNewWorkers(Map<Integer, String> 
newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws 
IOException {
-
-        Map<String, Integer> newValidWorkerIds = new HashMap<>();
-        Map conf = supervisorData.getConf();
-        String supervisorId = supervisorData.getSupervisorId();
-        String clusterMode = ConfigUtils.clusterMode(conf);
-
-        for (Map.Entry<Integer, LocalAssignment> entry : 
reassignExecutors.entrySet()) {
-            Integer port = entry.getKey();
-            LocalAssignment assignment = entry.getValue();
-            String workerId = newWorkerIds.get(port);
-            String stormId = assignment.get_topology_id();
-            WorkerResources resources = assignment.get_resources();
-
-            // This condition checks for required files exist before launching 
the worker
-            if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
-                String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
-                String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, 
workerId);
-
-                LOG.info("Launching worker with assignment {} for this 
supervisor {} on port {} with id {}", assignment, 
supervisorData.getSupervisorId(), port,
-                        workerId);
-
-                FileUtils.forceMkdir(new File(pidsPath));
-                FileUtils.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, 
workerId)));
-                FileUtils.forceMkdir(new File(hbPath));
-
-                if (clusterMode.endsWith("distributed")) {
-                    launchDistributedWorker(supervisorData.getWorkerManager(), 
conf, supervisorId, supervisorData.getAssignmentId(), stormId, 
port.longValue(), workerId, resources, supervisorData.getDeadWorkers());
-                } else if (clusterMode.endsWith("local")) {
-                    launchLocalWorker(supervisorData, stormId, 
port.longValue(), workerId, resources);
-                }
-                newValidWorkerIds.put(workerId, port);
-
-            } else {
-                LOG.info("Missing topology storm code, so can't launch worker 
with assignment {} for this supervisor {} on port {} with id {}", assignment,
-                        supervisorData.getSupervisorId(), port, workerId);
-            }
-
-        }
-        return newValidWorkerIds;
-    }
-
-    public void writeLogMetadata(Map stormconf, String user, String workerId, 
String stormId, Long port, Map conf) throws IOException {
-        Map data = new HashMap();
-        data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
-        data.put("worker-id", workerId);
-
-        Set<String> logsGroups = new HashSet<>();
-        //for supervisor-test
-        if (stormconf.get(Config.LOGS_GROUPS) != null) {
-            List<String> groups = (List<String>) 
stormconf.get(Config.LOGS_GROUPS);
-            for (String group : groups){
-                logsGroups.add(group);
-            }
-        }
-        if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
-            List<String> topGroups = (List<String>) 
stormconf.get(Config.TOPOLOGY_GROUPS);
-            logsGroups.addAll(topGroups);
-        }
-        data.put(Config.LOGS_GROUPS, logsGroups.toArray());
-
-        Set<String> logsUsers = new HashSet<>();
-        if (stormconf.get(Config.LOGS_USERS) != null) {
-            List<String> logUsers = (List<String>) 
stormconf.get(Config.LOGS_USERS);
-            for (String logUser : logUsers){
-                logsUsers.add(logUser);
-            }
-        }
-        if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
-            List<String> topUsers = (List<String>) 
stormconf.get(Config.TOPOLOGY_USERS);
-            for (String logUser : topUsers){
-                logsUsers.add(logUser);
-            }
-        }
-        data.put(Config.LOGS_USERS, logsUsers.toArray());
-        writeLogMetadataToYamlFile(stormId, port, data, conf);
-    }
-
-    /**
-     * run worker as user needs the directory to have special permissions or 
it is insecure
-     * 
-     * @param stormId
-     * @param port
-     * @param data
-     * @param conf
-     * @throws IOException
-     */
-    public void writeLogMetadataToYamlFile(String stormId, Long port, Map 
data, Map conf) throws IOException {
-        File file = ConfigUtils.getLogMetaDataFile(conf, stormId, 
port.intValue());
-
-        if (!Utils.checkFileExists(file.getParent())) {
-            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
-                FileUtils.forceMkdir(file.getParentFile());
-                SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), 
file.getParentFile().getCanonicalPath());
-            } else {
-                file.getParentFile().mkdirs();
-            }
-        }
-        FileWriter writer = new FileWriter(file);
-        Yaml yaml = new Yaml();
-        try {
-            yaml.dump(data, writer);
-        }finally {
-            writer.close();
-        }
-    }
-
-    /**
-     * Create a symlink from workder directory to its port artifacts directory
-     * 
-     * @param conf
-     * @param stormId
-     * @param port
-     * @param workerId
-     */
-    protected void createArtifactsLink(Map conf, String stormId, Long port, 
String workerId) throws IOException {
-        String workerDir = ConfigUtils.workerRoot(conf, workerId);
-        String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
-        if (Utils.checkFileExists(workerDir)) {
-            LOG.debug("Creating symlinks for worker-id: {} storm-id: {} to its 
port artifacts directory", workerId, stormId);
-            Utils.createSymlink(workerDir, topoDir, "artifacts", 
String.valueOf(port));
-        }
-    }
-
-    /**
-     * Create symlinks in worker launch directory for all blobs
-     * 
-     * @param conf
-     * @param stormId
-     * @param workerId
-     * @throws IOException
-     */
-    protected void createBlobstoreLinks(Map conf, String stormId, String 
workerId) throws IOException {
-        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        String workerRoot = ConfigUtils.workerRoot(conf, workerId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        List<String> blobFileNames = new ArrayList<>();
-        if (blobstoreMap != null) {
-            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
-                String key = entry.getKey();
-                Map<String, Object> blobInfo = entry.getValue();
-                String ret = null;
-                if (blobInfo != null && blobInfo.containsKey("localname")) {
-                    ret = (String) blobInfo.get("localname");
-                } else {
-                    ret = key;
-                }
-                blobFileNames.add(ret);
-            }
-        }
-        List<String> resourceFileNames = new ArrayList<>();
-        resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
-        resourceFileNames.addAll(blobFileNames);
-        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames);
-        Utils.createSymlink(workerRoot, stormRoot, 
ConfigUtils.RESOURCES_SUBDIR);
-        for (String fileName : blobFileNames) {
-            Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
-        }
-    }
-
-    public void killWorker(SupervisorData supervisorData, IWorkerManager 
workerManager, String workerId) throws IOException, InterruptedException{
-        workerManager.shutdownWorker(supervisorData.getSupervisorId(), 
workerId, supervisorData.getWorkerThreadPids());
-        boolean success = workerManager.cleanupWorker(workerId);
-        if (success){
-            supervisorData.getDeadWorkers().remove(workerId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
deleted file mode 100644
index ce467ea..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ /dev/null
@@ -1,612 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.cluster.IStateStorage;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.event.EventManager;
-import org.apache.storm.generated.*;
-import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.LocalizedResource;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.*;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.JarURLConnection;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class SyncSupervisorEvent implements Runnable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
-
-    private EventManager syncSupEventManager;
-    private EventManager syncProcessManager;
-    private IStormClusterState stormClusterState;
-    private LocalState localState;
-    private SyncProcessEvent syncProcesses;
-    private SupervisorData supervisorData;
-
-    public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent 
syncProcesses, EventManager syncSupEventManager,
-            EventManager syncProcessManager) {
-
-        this.syncProcesses = syncProcesses;
-        this.syncSupEventManager = syncSupEventManager;
-        this.syncProcessManager = syncProcessManager;
-        this.stormClusterState = supervisorData.getStormClusterState();
-        this.localState = supervisorData.getLocalState();
-        this.supervisorData = supervisorData;
-    }
-
-    @Override
-    public void run() {
-        try {
-            Map conf = supervisorData.getConf();
-            Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
-            List<String> stormIds = 
stormClusterState.assignments(syncCallback);
-            Map<String, Map<String, Object>> assignmentsSnapshot =
-                    getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions().get(), syncCallback);
-            Map<String, List<ProfileRequest>> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
-
-            Set<String> allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
-            Map<String, String> stormcodeMap = 
readStormCodeLocations(assignmentsSnapshot);
-            Map<Integer, LocalAssignment> existingAssignment = 
localState.getLocalAssignmentsMap();
-            if (existingAssignment == null) {
-                existingAssignment = new HashMap<>();
-            }
-
-            Map<Integer, LocalAssignment> allAssignment =
-                    readAssignments(assignmentsSnapshot, existingAssignment, 
supervisorData.getAssignmentId(), supervisorData.getSyncRetry());
-
-            Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
-            Set<String> assignedStormIds = new HashSet<>();
-
-            for (Map.Entry<Integer, LocalAssignment> entry : 
allAssignment.entrySet()) {
-                if 
(supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
-                    newAssignment.put(entry.getKey(), entry.getValue());
-                    assignedStormIds.add(entry.getValue().get_topology_id());
-                }
-            }
-
-            Set<String> crashedStormIds = verifyDownloadedFiles(conf, 
supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
-            Set<String> downloadedStormIds = new HashSet<>();
-            downloadedStormIds.addAll(allDownloadedTopologyIds);
-            downloadedStormIds.removeAll(crashedStormIds);
-
-            LOG.debug("Synchronizing supervisor");
-            LOG.debug("Storm code map: {}", stormcodeMap);
-            LOG.debug("All assignment: {}", allAssignment);
-            LOG.debug("New assignment: {}", newAssignment);
-            LOG.debug("Assigned Storm Ids {}", assignedStormIds);
-            LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds);
-            LOG.debug("Checked Downloaded Ids {}", crashedStormIds);
-            LOG.debug("Downloaded Ids {}", downloadedStormIds);
-            LOG.debug("Storm Ids Profiler Actions {}", 
stormIdToProfilerActions);
-
-            // download code first
-            // This might take awhile
-            // - should this be done separately from usual monitoring?
-            // should we only download when topology is assigned to this 
supervisor?
-            for (Map.Entry<String, String> entry : stormcodeMap.entrySet()) {
-                String stormId = entry.getKey();
-                if (!downloadedStormIds.contains(stormId) && 
assignedStormIds.contains(stormId)) {
-                    LOG.info("Downloading code for storm id {}.", stormId);
-                    try {
-                        downloadStormCode(conf, stormId, entry.getValue(), 
supervisorData.getLocalizer());
-                    } catch (Exception e) {
-                        if 
(Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
-                            LOG.warn("Nimbus leader was not available.", e);
-                        } else if 
(Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
-                            LOG.warn("There was a connection problem with 
nimbus.", e);
-                        } else {
-                            throw e;
-                        }
-                    }
-                    LOG.info("Finished downloading code for storm id {}", 
stormId);
-                }
-            }
-
-            LOG.debug("Writing new assignment {}", newAssignment);
-
-            Set<Integer> killWorkers = new HashSet<>();
-            killWorkers.addAll(existingAssignment.keySet());
-            killWorkers.removeAll(newAssignment.keySet());
-            for (Integer port : killWorkers) {
-                supervisorData.getiSupervisor().killedWorker(port);
-            }
-
-            supervisorData.getiSupervisor().assigned(newAssignment.keySet());
-            localState.setLocalAssignmentsMap(newAssignment);
-            supervisorData.setAssignmentVersions(assignmentsSnapshot);
-            
supervisorData.setStormIdToProfilerActions(stormIdToProfilerActions);
-
-            Map<Long, LocalAssignment> convertNewAssignment = new HashMap<>();
-            for (Map.Entry<Integer, LocalAssignment> entry : 
newAssignment.entrySet()) {
-                convertNewAssignment.put(entry.getKey().longValue(), 
entry.getValue());
-            }
-            supervisorData.setCurrAssignment(convertNewAssignment);
-
-            syncProcessManager.add(syncProcesses);
-        } catch (Exception e) {
-            LOG.error("Failed to Sync Supervisor", e);
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    protected Map<String, Map<String, Object>> 
getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> 
stormIds,
-            Map<String, Map<String, Object>> localAssignmentVersion, Runnable 
callback) throws Exception {
-        Map<String, Map<String, Object>> updateAssignmentVersion = new 
HashMap<>();
-        for (String stormId : stormIds) {
-            Integer recordedVersion = -1;
-            Integer version = stormClusterState.assignmentVersion(stormId, 
callback);
-            if (localAssignmentVersion.containsKey(stormId) && 
localAssignmentVersion.get(stormId) != null) {
-                recordedVersion = (Integer) 
localAssignmentVersion.get(stormId).get(IStateStorage.VERSION);
-            }
-            if (version == null) {
-                // ignore
-            } else if (version == recordedVersion) {
-                updateAssignmentVersion.put(stormId, 
localAssignmentVersion.get(stormId));
-            } else {
-                Map<String, Object> assignmentVersion = (Map<String, Object>) 
stormClusterState.assignmentInfoWithVersion(stormId, callback);
-                updateAssignmentVersion.put(stormId, assignmentVersion);
-            }
-        }
-        return updateAssignmentVersion;
-    }
-
-    protected Map<String, List<ProfileRequest>> 
getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) 
throws Exception {
-        Map<String, List<ProfileRequest>> ret = new HashMap<String, 
List<ProfileRequest>>();
-        for (String stormId : stormIds) {
-            List<ProfileRequest> profileRequests = 
stormClusterState.getTopologyProfileRequests(stormId);
-            ret.put(stormId, profileRequests);
-        }
-        return ret;
-    }
-
-    protected Map<String, String> readStormCodeLocations(Map<String, 
Map<String, Object>> assignmentsSnapshot) {
-        Map<String, String> stormcodeMap = new HashMap<>();
-        for (Map.Entry<String, Map<String, Object>> entry : 
assignmentsSnapshot.entrySet()) {
-            Assignment assignment = (Assignment) 
(entry.getValue().get(IStateStorage.DATA));
-            if (assignment != null) {
-                stormcodeMap.put(entry.getKey(), 
assignment.get_master_code_dir());
-            }
-        }
-        return stormcodeMap;
-    }
-
-    /**
-     * Check for the files exists to avoid supervisor crashing Also makes sure 
there is no necessity for locking"
-     * 
-     * @param conf
-     * @param localizer
-     * @param assignedStormIds
-     * @param allDownloadedTopologyIds
-     * @return
-     */
-    protected Set<String> verifyDownloadedFiles(Map conf, Localizer localizer, 
Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds)
-            throws IOException {
-        Set<String> srashStormIds = new HashSet<>();
-        for (String stormId : allDownloadedTopologyIds) {
-            if (assignedStormIds.contains(stormId)) {
-                if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
-                    LOG.debug("Files not present in topology directory");
-                    SupervisorUtils.rmTopoFiles(conf, stormId, localizer, 
false);
-                    srashStormIds.add(stormId);
-                }
-            }
-        }
-        return srashStormIds;
-    }
-
-    /**
-     * download code ; two cluster mode: local and distributed
-     *
-     * @param conf
-     * @param stormId
-     * @param masterCodeDir
-     * @throws IOException
-     */
-    private void downloadStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
-        String clusterMode = ConfigUtils.clusterMode(conf);
-
-        if (clusterMode.endsWith("distributed")) {
-            downloadDistributeStormCode(conf, stormId, masterCodeDir, 
localizer);
-        } else if (clusterMode.endsWith("local")) {
-            downloadLocalStormCode(conf, stormId, masterCodeDir, localizer);
-        }
-    }
-
-    private void downloadLocalStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
-
-        String tmproot = ConfigUtils.supervisorTmpDir(conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
-        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-        BlobStore blobStore = Utils.getNimbusBlobStore(conf, masterCodeDir, 
null);
-        FileOutputStream codeOutStream = null;
-        FileOutputStream confOutStream = null;
-        try {
-            FileUtils.forceMkdir(new File(tmproot));
-            String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
-            String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
-            String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
-            String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
-            codeOutStream = new FileOutputStream(codePath);
-            blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
-            confOutStream = new FileOutputStream(confPath);
-            blobStore.readBlobTo(stormConfKey, confOutStream, null);
-        } finally {
-            if (codeOutStream != null)
-                codeOutStream.close();
-            if (confOutStream != null)
-                codeOutStream.close();
-            blobStore.shutdown();
-        }
-        FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
-        SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
-        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
-
-        String resourcesJar = resourcesJar();
-
-        URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
-
-        String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + 
ConfigUtils.RESOURCES_SUBDIR;
-
-        if (resourcesJar != null) {
-            LOG.info("Extracting resources from jar at {} to {}", 
resourcesJar, targetDir);
-            Utils.extractDirFromJar(resourcesJar, 
ConfigUtils.RESOURCES_SUBDIR, stormroot);
-        } else if (url != null) {
-
-            LOG.info("Copying resources at {} to {} ", url.toString(), 
targetDir);
-            if (url.getProtocol() == "jar") {
-                JarURLConnection urlConnection = (JarURLConnection) 
url.openConnection();
-                
Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), 
ConfigUtils.RESOURCES_SUBDIR, stormroot);
-            } else {
-                FileUtils.copyDirectory(new File(url.getFile()), (new 
File(targetDir)));
-            }
-        }
-    }
-
-    /**
-     * Downloading to permanent location is atomic
-     * 
-     * @param conf
-     * @param stormId
-     * @param masterCodeDir
-     * @param localizer
-     * @throws Exception
-     */
-    private void downloadDistributeStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
-
-        String tmproot = ConfigUtils.supervisorTmpDir(conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
-        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-        ClientBlobStore blobStore = 
Utils.getClientBlobStoreForSupervisor(conf);
-        FileUtils.forceMkdir(new File(tmproot));
-        if (Utils.isOnWindows()) {
-            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
-                throw new RuntimeException("ERROR: Windows doesn't implement 
setting the correct permissions");
-            }
-        } else {
-            Utils.restrictPermissions(tmproot);
-        }
-        String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
-        String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
-        String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
-        String jarPath = ConfigUtils.supervisorStormJarPath(tmproot);
-        String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
-        String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
-        Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
-        Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
-        Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore);
-        blobStore.shutdown();
-        Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, 
tmproot);
-        downloadBlobsForTopology(conf, confPath, localizer, tmproot);
-        downloadDependenciesForTopology(conf, confPath, codePath, localizer, 
tmproot);
-        if (areBlobsInBlobStoreMapDownloaded(confPath, tmproot) &&
-                areDependencyFilesDownloaded(codePath, tmproot)) {
-            LOG.info("Successfully downloaded blob resources for storm-id {}", 
stormId);
-            if (Utils.isOnWindows()) {
-                // Files/move with non-empty directory doesn't work well on 
Windows
-                FileUtils.moveDirectory(new File(tmproot), new 
File(stormroot));
-            } else {
-                FileUtils.forceMkdir(new File(stormroot));
-                Files.move(new File(tmproot).toPath(), new 
File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE);
-            }
-        } else {
-            LOG.info("Failed to download blob resources for storm-id ", 
stormId);
-            Utils.forceDelete(tmproot);
-        }
-    }
-
-    /**
-     * Assert if all blobs are downloaded for the given topology
-     *
-     * @param targetDir
-     * @param blobFileNames
-     * @return
-     */
-    protected boolean areBlobsDownloaded(String targetDir, List<String> 
blobFileNames) throws IOException {
-        for (String string : blobFileNames) {
-            if (!Utils.checkFileExists(targetDir, string)) {
-                LOG.info("Fail to find downloaded file: dir {} filename {}", 
targetDir, string);
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Assert if all blobs in blobstore map are downloaded for the given 
topology
-     *
-     * @param stormconfPath
-     * @param targetDir
-     * @return
-     */
-    protected boolean areBlobsInBlobStoreMapDownloaded(String stormconfPath, 
String targetDir) throws IOException {
-        Map stormConf = 
Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new 
File(stormconfPath)));
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        List<String> blobFileNames = new ArrayList<>();
-        if (blobstoreMap != null) {
-            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
-                String key = entry.getKey();
-                Map<String, Object> blobInfo = entry.getValue();
-                String ret = null;
-                if (blobInfo != null && blobInfo.containsKey("localname")) {
-                    ret = (String) blobInfo.get("localname");
-                } else {
-                    ret = key;
-                }
-                blobFileNames.add(ret);
-            }
-        }
-
-        return areBlobsDownloaded(targetDir, blobFileNames);
-    }
-
-    /**
-     * Assert if all dependencies blobs are downloaded for the given topology
-     * 
-     * @param stormcodePath
-     * @param targetDir
-     * @return
-     */
-    protected boolean areDependencyFilesDownloaded(String stormcodePath, 
String targetDir) throws IOException {
-        StormTopology stormCode = 
ConfigUtils.readSupervisorStormCodeGivenPath(stormcodePath);
-        List<String> blobFileNames = new ArrayList<>();
-        blobFileNames.addAll(stormCode.get_dependency_jars());
-        blobFileNames.addAll(stormCode.get_dependency_artifacts());
-
-        return areBlobsDownloaded(targetDir, blobFileNames);
-    }
-
-    // FIXME: refactor to downloadBlobsForTopology and 
downloadDependenciesForTopology to extract common code like 1.x of 
supervisor.clj
-
-    /**
-     * Download all blobs listed in the topology configuration for a given 
topology.
-     * 
-     * @param conf
-     * @param stormconfPath
-     * @param localizer
-     * @param tmpRoot
-     */
-    protected void downloadBlobsForTopology(Map conf, String stormconfPath, 
Localizer localizer, String tmpRoot) throws IOException {
-        Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, 
stormconfPath);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
-        File userDir = localizer.getLocalUserFileCacheDir(user);
-        List<LocalResource> localResourceList = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
-        if (localResourceList.size() > 0) {
-            if (!userDir.exists()) {
-                FileUtils.forceMkdir(userDir);
-            }
-            try {
-                List<LocalizedResource> localizedResources = 
localizer.getBlobs(localResourceList, user, topoName, userDir);
-                setupBlobPermission(conf, user, userDir.toString());
-                for (LocalizedResource localizedResource : localizedResources) 
{
-                    File rsrcFilePath = new 
File(localizedResource.getFilePath());
-                    String keyName = rsrcFilePath.getName();
-                    String blobSymlinkTargetName = new 
File(localizedResource.getCurrentSymlinkPath()).getName();
-
-                    String symlinkName = null;
-                    if (blobstoreMap != null) {
-                        Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
-                        if (blobInfo != null && 
blobInfo.containsKey("localname")) {
-                            symlinkName = (String) blobInfo.get("localname");
-                        } else {
-                            symlinkName = keyName;
-                        }
-                    }
-                    Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), 
symlinkName, blobSymlinkTargetName);
-                }
-            } catch (AuthorizationException authExp) {
-                LOG.error("AuthorizationException error {}", authExp);
-            } catch (KeyNotFoundException knf) {
-                LOG.error("KeyNotFoundException error {}", knf);
-            }
-        }
-    }
-
-    /**
-     * Download all dependencies blobs listed in the topology configuration 
for a given topology.
-     *
-     * @param conf
-     * @param stormconfPath
-     * @param stormcodePath
-     * @param localizer
-     * @param tmpRoot
-     */
-    protected void downloadDependenciesForTopology(Map conf, String 
stormconfPath, String stormcodePath, Localizer localizer, String tmpRoot) 
throws IOException {
-        Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, 
stormconfPath);
-        StormTopology stormCode = 
ConfigUtils.readSupervisorStormCodeGivenPath(stormcodePath);
-
-        List<String> dependencies = new ArrayList<>();
-        dependencies.addAll(stormCode.get_dependency_jars());
-        dependencies.addAll(stormCode.get_dependency_artifacts());
-
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
-        File userDir = localizer.getLocalUserFileCacheDir(user);
-
-        List<LocalResource> localResourceList = new ArrayList<>();
-        for (String dependency : dependencies) {
-            localResourceList.add(new LocalResource(dependency, false));
-        }
-
-        if (localResourceList.size() > 0) {
-            if (!userDir.exists()) {
-                FileUtils.forceMkdir(userDir);
-            }
-            try {
-                List<LocalizedResource> localizedResources = 
localizer.getBlobs(localResourceList, user, topoName, userDir);
-                setupBlobPermission(conf, user, userDir.toString());
-                for (LocalizedResource localizedResource : localizedResources) 
{
-                    File rsrcFilePath = new 
File(localizedResource.getFilePath());
-                    String keyName = rsrcFilePath.getName();
-                    String blobSymlinkTargetName = new 
File(localizedResource.getCurrentSymlinkPath()).getName();
-
-                    String symlinkName = keyName;
-                    Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), 
symlinkName, blobSymlinkTargetName);
-                }
-            } catch (AuthorizationException authExp) {
-                LOG.error("AuthorizationException error {}", authExp);
-            } catch (KeyNotFoundException knf) {
-                LOG.error("KeyNotFoundException error {}", knf);
-            }
-        }
-    }
-
-    protected void setupBlobPermission(Map conf, String user, String path) 
throws IOException {
-        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
-            String logPrefix = "setup blob permissions for " + path;
-            SupervisorUtils.processLauncherAndWait(conf, user, 
Arrays.asList("blob", path), null, logPrefix);
-        }
-
-    }
-
-    private String resourcesJar() throws IOException {
-
-        String path = Utils.currentClasspath();
-        if (path == null) {
-            return null;
-        }
-        String[] paths = path.split(File.pathSeparator);
-        List<String> jarPaths = new ArrayList<String>();
-        for (String s : paths) {
-            if (s.endsWith(".jar")) {
-                jarPaths.add(s);
-            }
-        }
-
-        List<String> rtn = new ArrayList<String>();
-        int size = jarPaths.size();
-        for (int i = 0; i < size; i++) {
-            if (Utils.zipDoesContainDir(jarPaths.get(i), 
ConfigUtils.RESOURCES_SUBDIR)) {
-                rtn.add(jarPaths.get(i));
-            }
-        }
-        if (rtn.size() == 0)
-            return null;
-
-        return rtn.get(0);
-    }
-
-    protected Map<Integer, LocalAssignment> readAssignments(Map<String, 
Map<String, Object>> assignmentsSnapshot,
-            Map<Integer, LocalAssignment> existingAssignment, String 
assignmentId, AtomicInteger retries) {
-        try {
-            Map<Integer, LocalAssignment> portLA = new HashMap<Integer, 
LocalAssignment>();
-            for (Map.Entry<String, Map<String, Object>> assignEntry : 
assignmentsSnapshot.entrySet()) {
-                String stormId = assignEntry.getKey();
-                Assignment assignment = (Assignment) 
assignEntry.getValue().get(IStateStorage.DATA);
-
-                Map<Integer, LocalAssignment> portTasks = 
readMyExecutors(stormId, assignmentId, assignment);
-
-                for (Map.Entry<Integer, LocalAssignment> entry : 
portTasks.entrySet()) {
-
-                    Integer port = entry.getKey();
-
-                    LocalAssignment la = entry.getValue();
-
-                    if (!portLA.containsKey(port)) {
-                        portLA.put(port, la);
-                    } else {
-                        throw new RuntimeException("Should not have multiple 
topologys assigned to one port");
-                    }
-                }
-            }
-            retries.set(0);
-            return portLA;
-        } catch (RuntimeException e) {
-            if (retries.get() > 2) {
-                throw e;
-            } else {
-                retries.addAndGet(1);
-            }
-            LOG.warn("{} : retrying {} of 3", e.getMessage(), retries.get());
-            return existingAssignment;
-        }
-    }
-
-    protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, 
String assignmentId, Assignment assignment) {
-        Map<Integer, LocalAssignment> portTasks = new HashMap<>();
-        Map<Long, WorkerResources> slotsResources = new HashMap<>();
-        Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = 
assignment.get_worker_resources();
-        if (nodeInfoWorkerResourcesMap != null) {
-            for (Map.Entry<NodeInfo, WorkerResources> entry : 
nodeInfoWorkerResourcesMap.entrySet()) {
-                if (entry.getKey().get_node().equals(assignmentId)) {
-                    Set<Long> ports = entry.getKey().get_port();
-                    for (Long port : ports) {
-                        slotsResources.put(port, entry.getValue());
-                    }
-                }
-            }
-        }
-        Map<List<Long>, NodeInfo> executorNodePort = 
assignment.get_executor_node_port();
-        if (executorNodePort != null) {
-            for (Map.Entry<List<Long>, NodeInfo> entry : 
executorNodePort.entrySet()) {
-                if (entry.getValue().get_node().equals(assignmentId)) {
-                    for (Long port : entry.getValue().get_port()) {
-                        LocalAssignment localAssignment = 
portTasks.get(port.intValue());
-                        if (localAssignment == null) {
-                            List<ExecutorInfo> executors = new 
ArrayList<ExecutorInfo>();
-                            localAssignment = new LocalAssignment(stormId, 
executors);
-                            if (slotsResources.containsKey(port)) {
-                                
localAssignment.set_resources(slotsResources.get(port));
-                            }
-                            portTasks.put(port.intValue(), localAssignment);
-                        }
-                        List<ExecutorInfo> executorInfoList = 
localAssignment.get_executors();
-                        executorInfoList.add(new 
ExecutorInfo(entry.getKey().get(0).intValue(), 
entry.getKey().get(entry.getKey().size() - 1).intValue()));
-                    }
-                }
-            }
-        }
-        return portTasks;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
deleted file mode 100644
index 3e1e34d..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.daemon.supervisor.timer;
-
-import com.google.common.collect.Lists;
-import org.apache.storm.Config;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.daemon.supervisor.SupervisorData;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.generated.ProfileAction;
-import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.*;
-
-public class RunProfilerActions implements Runnable {
-    private static Logger LOG = 
LoggerFactory.getLogger(RunProfilerActions.class);
-
-    private Map conf;
-    private IStormClusterState stormClusterState;
-    private String hostName;
-
-    private String profileCmd;
-
-    private SupervisorData supervisorData;
-
-    private class ActionExitCallback implements Utils.ExitCodeCallable {
-        private String stormId;
-        private ProfileRequest profileRequest;
-        private String logPrefix;
-        private boolean stop;
-
-        public ActionExitCallback(String stormId, ProfileRequest 
profileRequest, String logPrefix, boolean stop) {
-            this.stormId = stormId;
-            this.profileRequest = profileRequest;
-            this.logPrefix = logPrefix;
-            this.stop = stop;
-        }
-
-        @Override
-        public Object call() throws Exception {
-            return null;
-        }
-
-        @Override
-        public Object call(int exitCode) {
-            LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
-            try {
-                if (stop)
-                    stormClusterState.deleteTopologyProfileRequests(stormId, 
profileRequest);
-            } catch (Exception e) {
-                LOG.warn("failed delete profileRequest: " + profileRequest);
-            }
-            return null;
-        }
-    }
-
-    public RunProfilerActions(SupervisorData supervisorData) {
-        this.conf = supervisorData.getConf();
-        this.stormClusterState = supervisorData.getStormClusterState();
-        this.hostName = supervisorData.getHostName();
-        String stormHome = System.getProperty("storm.home");
-        this.profileCmd = stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + 
Utils.FILE_PATH_SEPARATOR + conf.get(Config.WORKER_PROFILER_COMMAND);
-        this.supervisorData = supervisorData;
-    }
-
-    @Override
-    public void run() {
-        Map<String, List<ProfileRequest>> stormIdToActions = 
supervisorData.getStormIdToProfilerActions().get();
-        try {
-            for (Map.Entry<String, List<ProfileRequest>> entry : 
stormIdToActions.entrySet()) {
-                String stormId = entry.getKey();
-                List<ProfileRequest> requests = entry.getValue();
-                if (requests != null) {
-                    for (ProfileRequest profileRequest : requests) {
-                        if 
(profileRequest.get_nodeInfo().get_node().equals(hostName)) {
-                            boolean stop = System.currentTimeMillis() > 
profileRequest.get_time_stamp();
-                            Long port = 
profileRequest.get_nodeInfo().get_port().iterator().next();
-                            String targetDir = 
ConfigUtils.workerArtifactsRoot(conf, stormId, port.intValue());
-                            Map stormConf = 
ConfigUtils.readSupervisorStormConf(conf, stormId);
-
-                            String user = null;
-                            if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) 
!= null) {
-                                user = (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
-                            }
-                            Map<String, String> env = null;
-                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != 
null) {
-                                env = (Map<String, String>) 
stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
-                            } else {
-                                env = new HashMap<String, String>();
-                            }
-
-                            String str = 
ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
-                            StringBuilder stringBuilder = new StringBuilder();
-
-                            String workerPid = null;
-                            try (FileReader reader = new FileReader(str); 
BufferedReader br = new BufferedReader(reader)) {
-                                workerPid = br.readLine().trim();
-                            }
-                            ProfileAction profileAction = 
profileRequest.get_action();
-                            String logPrefix = "ProfilerAction process " + 
stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
-
-                            // Until PROFILER_STOP action is invalid, keep 
launching profiler start in case worker restarted
-                            // The profiler plugin script validates if JVM is 
recording before starting another recording.
-                            List<String> command = mkCommand(profileAction, 
stop, workerPid, targetDir);
-                            try {
-                                ActionExitCallback actionExitCallback = new 
ActionExitCallback(stormId, profileRequest, logPrefix, stop);
-                                launchProfilerActionForWorker(user, targetDir, 
command, env, actionExitCallback, logPrefix);
-                            } catch (IOException e) {
-                                LOG.error("Error in processing ProfilerAction 
'{}' for {}:{}, will retry later", profileAction, stormId, port);
-                            } catch (RuntimeException e) {
-                                LOG.error("Error in processing ProfilerAction 
'{}' for {}:{}, will retry later", profileAction, stormId, port);
-                            }
-                        }
-                    }
-                }
-            }
-        } catch (Exception e) {
-            LOG.error("Error running profiler actions, will retry again 
later");
-        }
-    }
-
-    private void launchProfilerActionForWorker(String user, String targetDir, 
List<String> commands, Map<String, String> environment,
-            final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) 
throws IOException {
-        File targetFile = new File(targetDir);
-        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
-            LOG.info("Running as user:{} command:{}", user, commands);
-            String containerFile = Utils.containerFilePath(targetDir);
-            if (Utils.checkFileExists(containerFile)) {
-                SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
-            }
-            String scriptFile = Utils.scriptFilePath(targetDir);
-            if (Utils.checkFileExists(scriptFile)) {
-                SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
-            }
-            String script = Utils.writeScript(targetDir, commands, 
environment);
-            List<String> args = new ArrayList<>();
-            args.add("profiler");
-            args.add(targetDir);
-            args.add(script);
-            SupervisorUtils.processLauncher(conf, user, null, args, 
environment, logPrefix, exitCodeCallable, targetFile);
-        } else {
-            Utils.launchProcess(commands, environment, logPrefix, 
exitCodeCallable, targetFile);
-        }
-    }
-
-    private List<String> mkCommand(ProfileAction action, boolean stop, String 
workerPid, String targetDir) {
-        if (action == ProfileAction.JMAP_DUMP) {
-            return jmapDumpCmd(workerPid, targetDir);
-        } else if (action == ProfileAction.JSTACK_DUMP) {
-            return jstackDumpCmd(workerPid, targetDir);
-        } else if (action == ProfileAction.JPROFILE_DUMP) {
-            return jprofileDump(workerPid, targetDir);
-        } else if (action == ProfileAction.JVM_RESTART) {
-            return jprofileJvmRestart(workerPid);
-        } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
-            return jprofileStart(workerPid);
-        } else if (stop && action == ProfileAction.JPROFILE_STOP) {
-            return jprofileStop(workerPid, targetDir);
-        }
-        return Lists.newArrayList();
-    }
-
-    private List<String> jmapDumpCmd(String pid, String targetDir) {
-        return Lists.newArrayList(profileCmd, pid, "jmap", targetDir);
-    }
-
-    private List<String> jstackDumpCmd(String pid, String targetDir) {
-        return Lists.newArrayList(profileCmd, pid, "jstack", targetDir);
-    }
-
-    private List<String> jprofileStart(String pid) {
-        return Lists.newArrayList(profileCmd, pid, "start");
-    }
-
-    private List<String> jprofileStop(String pid, String targetDir) {
-        return Lists.newArrayList(profileCmd, pid, "stop", targetDir);
-    }
-
-    private List<String> jprofileDump(String pid, String targetDir) {
-        return Lists.newArrayList(profileCmd, pid, "dump", targetDir);
-    }
-
-    private List<String> jprofileJvmRestart(String pid) {
-        return Lists.newArrayList(profileCmd, pid, "kill");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index 3ce8f5d..0017092 100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -18,35 +18,24 @@
 
 package org.apache.storm.daemon.supervisor.timer;
 
-import org.apache.storm.command.HealthCheck;
-import org.apache.storm.daemon.supervisor.SupervisorData;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
 import java.util.Map;
 
-public class SupervisorHealthCheck implements Runnable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorHealthCheck.class);
+import org.apache.storm.command.HealthCheck;
+import org.apache.storm.daemon.supervisor.Supervisor;
 
-    private SupervisorData supervisorData;
+public class SupervisorHealthCheck implements Runnable {
+    private final Supervisor supervisor;
 
-    public SupervisorHealthCheck(SupervisorData supervisorData) {
-        this.supervisorData = supervisorData;
+    public SupervisorHealthCheck(Supervisor supervisor) {
+        this.supervisor = supervisor;
     }
 
     @Override
     public void run() {
-        Map conf = supervisorData.getConf();
-        IWorkerManager workerManager = supervisorData.getWorkerManager();
+        Map<String, Object> conf = supervisor.getConf();
         int healthCode = HealthCheck.healthCheck(conf);
         if (healthCode != 0) {
-            SupervisorUtils.shutdownAllWorkers(conf, 
supervisorData.getSupervisorId(), supervisorData.getWorkerThreadPids(), 
supervisorData.getDeadWorkers(),
-                    workerManager);
+            supervisor.shutdownAllWorkers();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index fd357c0..34c5682 100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -19,7 +19,7 @@ package org.apache.storm.daemon.supervisor.timer;
 
 import org.apache.storm.Config;
 import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorInfo;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
@@ -33,26 +33,26 @@ public class SupervisorHeartbeat implements Runnable {
 
      private final IStormClusterState stormClusterState;
      private final String supervisorId;
-     private final Map conf;
-     private final SupervisorData supervisorData;
+     private final Map<String, Object> conf;
+     private final Supervisor supervisor;
 
-    public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
-        this.stormClusterState = supervisorData.getStormClusterState();
-        this.supervisorId = supervisorData.getSupervisorId();
-        this.supervisorData = supervisorData;
+    public SupervisorHeartbeat(Map<String, Object> conf, Supervisor 
supervisor) {
+        this.stormClusterState = supervisor.getStormClusterState();
+        this.supervisorId = supervisor.getId();
+        this.supervisor = supervisor;
         this.conf = conf;
     }
 
-    private SupervisorInfo buildSupervisorInfo(Map conf, SupervisorData 
supervisorData) {
+    private SupervisorInfo buildSupervisorInfo(Map<String, Object> conf, 
Supervisor supervisor) {
         SupervisorInfo supervisorInfo = new SupervisorInfo();
         supervisorInfo.set_time_secs(Time.currentTimeSecs());
-        supervisorInfo.set_hostname(supervisorData.getHostName());
-        supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
+        supervisorInfo.set_hostname(supervisor.getHostName());
+        supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
 
         List<Long> usedPorts = new ArrayList<>();
-        usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet());
+        usedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
         supervisorInfo.set_used_ports(usedPorts);
-        List metaDatas = (List)supervisorData.getiSupervisor().getMetadata();
+        List metaDatas = (List)supervisor.getiSupervisor().getMetadata();
         List<Long> portList = new ArrayList<>();
         if (metaDatas != null){
             for (Object data : metaDatas){
@@ -64,8 +64,8 @@ public class SupervisorHeartbeat implements Runnable {
 
         supervisorInfo.set_meta(portList);
         supervisorInfo.set_scheduler_meta((Map<String, String>) 
conf.get(Config.SUPERVISOR_SCHEDULER_META));
-        supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
-        supervisorInfo.set_version(supervisorData.getStormVersion());
+        supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
+        supervisorInfo.set_version(supervisor.getStormVersion());
         supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
         return supervisorInfo;
     }
@@ -81,7 +81,7 @@ public class SupervisorHeartbeat implements Runnable {
 
     @Override
     public void run() {
-        SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, 
supervisorData);
+        SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, supervisor);
         stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
index 159697f..0b6d996 100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -18,7 +18,7 @@
 package org.apache.storm.daemon.supervisor.timer;
 
 import org.apache.storm.Config;
-import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
@@ -47,18 +47,18 @@ public class UpdateBlobs implements Runnable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(UpdateBlobs.class);
 
-    private SupervisorData supervisorData;
+    private Supervisor supervisor;
 
-    public UpdateBlobs(SupervisorData supervisorData) {
-        this.supervisorData = supervisorData;
+    public UpdateBlobs(Supervisor supervisor) {
+        this.supervisor = supervisor;
     }
 
     @Override
     public void run() {
         try {
-            Map conf = supervisorData.getConf();
-            Set<String> downloadedStormIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
-            AtomicReference<Map<Long, LocalAssignment>> newAssignment = 
supervisorData.getCurrAssignment();
+            Map<String, Object> conf = supervisor.getConf();
+            Set<String> downloadedStormIds = 
SupervisorUtils.readDownloadedTopologyIds(conf);
+            AtomicReference<Map<Long, LocalAssignment>> newAssignment = 
supervisor.getCurrAssignment();
             Set<String> assignedStormIds = new HashSet<>();
             for (LocalAssignment localAssignment : 
newAssignment.get().values()) {
                 assignedStormIds.add(localAssignment.get_topology_id());
@@ -67,7 +67,7 @@ public class UpdateBlobs implements Runnable {
                 if (assignedStormIds.contains(stormId)) {
                     String stormRoot = 
ConfigUtils.supervisorStormDistRoot(conf, stormId);
                     LOG.debug("Checking Blob updates for storm topology id {} 
With target_dir: {}", stormId, stormRoot);
-                    updateBlobsForTopology(conf, stormId, 
supervisorData.getLocalizer());
+                    updateBlobsForTopology(conf, stormId, 
supervisor.getLocalizer());
                 }
             }
         } catch (Exception e) {

Reply via email to