[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15221722#comment-15221722 ] ASF GitHub Bot commented on STORM-1279: --- Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/1257 > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15221675#comment-15221675 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-204393683 +1 looks good to me. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220961#comment-15220961 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-204198660 @revans2 Thank you again. In fact I can't do some testing with profiling options due to my jdk. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220842#comment-15220842 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-204167455 Found a few more bugs with manual testing. I would like to do some more with other profiling options that are not on by default but this should fix the default jstack/heap dump profiling. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220831#comment-15220831 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r58142341 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import com.google.common.collect.Lists; +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; +private boolean stop; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +this.stop = stop; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +if (stop) + stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); +} catch (Exception e) { +LOG.warn("failed delete profileRequest: " + profileRequest); +} +return null; +} +} + +public RunProfilerActions(SupervisorData supervisorData) { +this.conf = supervisorData.getConf(); +this.stormClusterState = supervisorData.getStormClusterState(); +this.hostName = supervisorData.getHostName(); +this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +MapstormIdToActions = supervisorData.getStormIdToProfilerActions().get(); +try { +for (Map.Entry entry : stormIdToActions.entrySet()) { +String stormId = entry.getKey(); +List requests = entry.getValue(); +if (requests != null) { +for (ProfileRequest profileRequest : requests) { +if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { +boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp(); +Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); +String targetDir =
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220803#comment-15220803 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r58140035 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import com.google.common.collect.Lists; +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; +private boolean stop; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +this.stop = stop; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +if (stop) + stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); +} catch (Exception e) { +LOG.warn("failed delete profileRequest: " + profileRequest); +} +return null; +} +} + +public RunProfilerActions(SupervisorData supervisorData) { +this.conf = supervisorData.getConf(); +this.stormClusterState = supervisorData.getStormClusterState(); +this.hostName = supervisorData.getHostName(); +this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +MapstormIdToActions = supervisorData.getStormIdToProfilerActions().get(); +try { +for (Map.Entry entry : stormIdToActions.entrySet()) { +String stormId = entry.getKey(); +List requests = entry.getValue(); +if (requests != null) { +for (ProfileRequest profileRequest : requests) { +if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { +boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp(); +Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); +String targetDir =
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220769#comment-15220769 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r58137861 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import com.google.common.collect.Lists; +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; +private boolean stop; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +this.stop = stop; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +if (stop) + stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); +} catch (Exception e) { +LOG.warn("failed delete profileRequest: " + profileRequest); +} +return null; +} +} + +public RunProfilerActions(SupervisorData supervisorData) { +this.conf = supervisorData.getConf(); +this.stormClusterState = supervisorData.getStormClusterState(); +this.hostName = supervisorData.getHostName(); +this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); --- End diff -- The original code had this relative to the `storm.home/bin` ``` profile-cmd (str (clojure.java.io/file storm-home "bin" (conf WORKER-PROFILER-COMMAND))) ``` Probably should be ``` String stormHome = System.getProperty("storm.home"); this.profileCmd = stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + conf.get(Config.WORKER_PROFILER_COMMAND)); ``` > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220525#comment-15220525 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-204088810 I just ran into another bug https://issues.apache.org/jira/browse/STORM-1672 that is preventing me from manually testing some of the changes for the supervisor. I will work on that and see if I can get this in soon. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220474#comment-15220474 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-204081963 Still +1 > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219391#comment-15219391 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-203761199 @revans2 @longdafeng Thank you very much. I have addressed your comments. And we will follow the JIRAs about some other things. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219387#comment-15219387 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r58003203 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java --- @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class DefaultWorkerManager implements IWorkerManager { + +private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class); + +private Map conf; +private CgroupManager resourceIsolationManager; +private boolean runWorkerAsUser; + +@Override +public void prepareWorker(Map conf, Localizer localizer) { +this.conf = conf; +if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { +try { +this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); +this.resourceIsolationManager.prepare(conf); +LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); +} catch (IOException e) { +throw Utils.wrapInRuntime(e); +} +} else { +this.resourceIsolationManager = null; +} +this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); +} + +@Override +public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, +Utils.ExitCodeCallable workerExitCallback) { +try { + +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); +String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); +String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); + +String stormLogDir = ConfigUtils.getLogDir(); +String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); + +String stormLog4j2ConfDir; +if (StringUtils.isNotBlank(stormLogConfDir)) { +if (Utils.isAbsolutePath(stormLogConfDir)) { +stormLog4j2ConfDir = stormLogConfDir; +} else { +stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; +} +} else { +stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; +} + +String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + +String jlp = jlp(stormRoot, conf); + +String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + +Map stormConf =
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219285#comment-15219285 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57997792 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); --- End diff -- @revans2 I mean it before. Now I find I did the wrong thing. We should add it back in. Thus we can guarantee operation jprofileStop even if errors happened when operating jprofileStart. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219233#comment-15219233 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57994377 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); --- End diff -- Of course, I will follow up a JIR for this > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219180#comment-15219180 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57991093 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java --- @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SupervisorHeartbeat implements Runnable { + + private final IStormClusterState stormClusterState; + private final String supervisorId; + private final Map conf; + private final SupervisorData supervisorData; + +public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { +this.stormClusterState = supervisorData.getStormClusterState(); +this.supervisorId = supervisorData.getSupervisorId(); +this.supervisorData = supervisorData; +this.conf = conf; +} + +private SupervisorInfo buildSupervisorInfo(Map conf, SupervisorData supervisorData) { +SupervisorInfo supervisorInfo = new SupervisorInfo(); +supervisorInfo.set_time_secs(Time.currentTimeSecs()); +supervisorInfo.set_hostname(supervisorData.getHostName()); +supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); + +List usedPorts = new ArrayList<>(); + usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet()); +supervisorInfo.set_used_ports(usedPorts); +List metaDatas = (List)supervisorData.getiSupervisor().getMetadata(); +List portList = new ArrayList<>(); +if (metaDatas != null){ +for (Object data : metaDatas){ +Integer port = Utils.getInt(data); +if (port != null) +portList.add(port.longValue()); +} +} + +supervisorInfo.set_meta(portList); +supervisorInfo.set_scheduler_meta((Map) conf.get(Config.SUPERVISOR_SCHEDULER_META)); + supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); +supervisorInfo.set_version(supervisorData.getStormVersion()); +supervisorInfo.set_resources_map(mkSupervisorCapacities(conf)); +return supervisorInfo; +} + +private Map mkSupervisorCapacities(Map conf) { +Map ret = new HashMap (); +Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)); +ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); +Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY)); +ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); +return ret; +} --- End diff -- Of course, I'll be only too pleased to follow on the JIRA later > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > >
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219179#comment-15219179 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57990981 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process processLauncher(Map conf, String user, List commandPrefix, List args, Mapenvironment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling processLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +if (commandPrefix != null){ +commands.addAll(commandPrefix); +} +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int processLauncherAndWait(Map conf, String user, List args, final Map environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218645#comment-15218645 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-203592338 Just FYI on the integration testing. There are a few things broken on master right now, that I am in the process of fixing. https://issues.apache.org/jira/browse/STORM-1663 https://issues.apache.org/jira/browse/STORM-1665 https://issues.apache.org/jira/browse/STORM-1666 Should have a pull request for the first two shortly and then I will start on the third one. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218477#comment-15218477 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-203558194 Done with my review. Overall it looks really good. There were a few bugs but most things were minor nits. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218468#comment-15218468 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57934874 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java --- @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class DefaultWorkerManager implements IWorkerManager { + +private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class); + +private Map conf; +private CgroupManager resourceIsolationManager; +private boolean runWorkerAsUser; + +@Override +public void prepareWorker(Map conf, Localizer localizer) { +this.conf = conf; +if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { +try { +this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); +this.resourceIsolationManager.prepare(conf); +LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); +} catch (IOException e) { +throw Utils.wrapInRuntime(e); +} +} else { +this.resourceIsolationManager = null; +} +this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); +} + +@Override +public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, +Utils.ExitCodeCallable workerExitCallback) { +try { + +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); +String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); +String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); + +String stormLogDir = ConfigUtils.getLogDir(); +String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); + +String stormLog4j2ConfDir; +if (StringUtils.isNotBlank(stormLogConfDir)) { +if (Utils.isAbsolutePath(stormLogConfDir)) { +stormLog4j2ConfDir = stormLogConfDir; +} else { +stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; +} +} else { +stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; +} + +String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + +String jlp = jlp(stormRoot, conf); + +String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + +Map stormConf =
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218453#comment-15218453 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57933384 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java --- @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.Utils; + +import java.util.List; +import java.util.Map; + +public interface IWorkerManager { +public void prepareWorker(Map conf, Localizer localizer); + +IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, + Utils.ExitCodeCallable workerExitCallback); + +IWorkerResult shutdownWorker(String supervisorId, String workerId, MapworkerThreadPids); + +IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources); + +public boolean cleanupWorker(String workerId); +} --- End diff -- All of this was intended to be an example of what we might want to do. IWorkerReasult is never used and is always returned as null, so lets just set the return types to be void for now and remove IWorkerResult. resizeWorker is also not used/supported so lets just delete it for now. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218419#comment-15218419 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57931082 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java --- @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class DefaultWorkerManager implements IWorkerManager { + +private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class); + +private Map conf; +private CgroupManager resourceIsolationManager; +private boolean runWorkerAsUser; + +@Override +public void prepareWorker(Map conf, Localizer localizer) { +this.conf = conf; +if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { +try { +this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); +this.resourceIsolationManager.prepare(conf); +LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); +} catch (IOException e) { +throw Utils.wrapInRuntime(e); +} +} else { +this.resourceIsolationManager = null; +} +this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); +} + +@Override +public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, +Utils.ExitCodeCallable workerExitCallback) { +try { + +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); +String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); +String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); + +String stormLogDir = ConfigUtils.getLogDir(); +String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); + +String stormLog4j2ConfDir; +if (StringUtils.isNotBlank(stormLogConfDir)) { +if (Utils.isAbsolutePath(stormLogConfDir)) { +stormLog4j2ConfDir = stormLogConfDir; +} else { +stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; +} +} else { +stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; +} + +String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + +String jlp = jlp(stormRoot, conf); + +String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + +Map stormConf =
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218406#comment-15218406 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57929681 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java --- @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SupervisorHeartbeat implements Runnable { + + private final IStormClusterState stormClusterState; + private final String supervisorId; + private final Map conf; + private final SupervisorData supervisorData; + +public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { +this.stormClusterState = supervisorData.getStormClusterState(); +this.supervisorId = supervisorData.getSupervisorId(); +this.supervisorData = supervisorData; +this.conf = conf; +} + +private SupervisorInfo buildSupervisorInfo(Map conf, SupervisorData supervisorData) { +SupervisorInfo supervisorInfo = new SupervisorInfo(); +supervisorInfo.set_time_secs(Time.currentTimeSecs()); +supervisorInfo.set_hostname(supervisorData.getHostName()); +supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); + +List usedPorts = new ArrayList<>(); + usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet()); +supervisorInfo.set_used_ports(usedPorts); +List metaDatas = (List)supervisorData.getiSupervisor().getMetadata(); +List portList = new ArrayList<>(); +if (metaDatas != null){ +for (Object data : metaDatas){ +Integer port = Utils.getInt(data); +if (port != null) +portList.add(port.longValue()); +} +} + +supervisorInfo.set_meta(portList); +supervisorInfo.set_scheduler_meta((Map) conf.get(Config.SUPERVISOR_SCHEDULER_META)); + supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); +supervisorInfo.set_version(supervisorData.getStormVersion()); +supervisorInfo.set_resources_map(mkSupervisorCapacities(conf)); +return supervisorInfo; +} + +private Map mkSupervisorCapacities(Map conf) { +Map ret = new HashMap (); +Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)); +ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); +Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY)); +ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); +return ret; +} --- End diff -- Yes that would be good in a follow on JIRA, but I think we still want to keep the ability to manually set it too. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218401#comment-15218401 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57929240 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java --- @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.command.HealthCheck; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorHealthCheck implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class); + +private SupervisorData supervisorData; + +public SupervisorHealthCheck(SupervisorData supervisorData) { +this.supervisorData = supervisorData; +} + +@Override +public void run() { +Map conf = supervisorData.getConf(); +IWorkerManager workerManager = supervisorData.getWorkerManager(); +int healthCode = HealthCheck.healthCheck(conf); +Collection workerIds = SupervisorUtils.supervisorWorkerIds(conf); +if (healthCode != 0) { +for (String workerId : workerIds) { --- End diff -- I agree it would remove some duplicate code. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218345#comment-15218345 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57926750 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); --- End diff -- The original code only did this if it was a stop. ``` action-on-exit (fn [exit-code] (log-message log-prefix " profile-action exited for code: " exit-code) (if stop? (delete-topology-profiler-action storm-cluster-state storm-id (thriftify-profile-request pro-action ``` Now it looks like we do it every time. @kishorvpatil what are the ramifications of this? Does it need to be added back in? > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218336#comment-15218336 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57926189 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); +} catch (Exception e) { +LOG.warn("failed delete profileRequest: " + profileRequest); +} +return null; +} +} + +public RunProfilerActions(SupervisorData supervisorData) { +this.conf = supervisorData.getConf(); +this.stormClusterState = supervisorData.getStormClusterState(); +this.hostName = supervisorData.getHostName(); +this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +MapstormIdToActions = supervisorData.getStormIdToProfileActions().get(); +try { +for (Map.Entry entry : stormIdToActions.entrySet()) { +String stormId = entry.getKey(); +List requests = entry.getValue(); +if (requests != null) { +for (ProfileRequest profileRequest : requests) { +if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { +boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false; +Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); +String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port)); +Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218332#comment-15218332 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57925763 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); +} catch (Exception e) { +LOG.warn("failed delete profileRequest: " + profileRequest); +} +return null; +} +} + +public RunProfilerActions(SupervisorData supervisorData) { +this.conf = supervisorData.getConf(); +this.stormClusterState = supervisorData.getStormClusterState(); +this.hostName = supervisorData.getHostName(); +this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +MapstormIdToActions = supervisorData.getStormIdToProfileActions().get(); +try { +for (Map.Entry entry : stormIdToActions.entrySet()) { +String stormId = entry.getKey(); +List requests = entry.getValue(); +if (requests != null) { +for (ProfileRequest profileRequest : requests) { +if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { +boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false; +Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); +String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port)); +Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218243#comment-15218243 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57918048 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); +} catch (Exception e) { +LOG.warn("failed delete profileRequest: " + profileRequest); +} +return null; +} +} + +public RunProfilerActions(SupervisorData supervisorData) { +this.conf = supervisorData.getConf(); +this.stormClusterState = supervisorData.getStormClusterState(); +this.hostName = supervisorData.getHostName(); +this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +MapstormIdToActions = supervisorData.getStormIdToProfileActions().get(); +try { +for (Map.Entry entry : stormIdToActions.entrySet()) { +String stormId = entry.getKey(); +List requests = entry.getValue(); +if (requests != null) { +for (ProfileRequest profileRequest : requests) { +if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { +boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false; --- End diff -- This can juts be ``` boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp(); ``` > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL:
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218217#comment-15218217 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57915788 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218193#comment-15218193 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57912850 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218143#comment-15218143 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57908284 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218084#comment-15218084 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-203471618 I have reviewed the PR, it is fine to me except several little problem, but I haven't done any test. Suggest to do integrate test, both local mode and distributed mode. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218032#comment-15218032 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57897402 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,427 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +MapassignedExecutors = localState.getLocalAssignmentsMap(); + +if (assignedExecutors == null) { +assignedExecutors = new HashMap<>(); +} +int now = Time.currentTimeSecs(); + +Map localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); + +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218058#comment-15218058 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57900100 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218056#comment-15218056 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57899879 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218022#comment-15218022 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57896552 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15218018#comment-15218018 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57896025 --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj --- @@ -0,0 +1,64 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] +[org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) + +(defn launch-local-worker [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + pid (Utils/uuid) +worker (worker/mk-worker conf + (.getSharedContext supervisorData) + stormId + (.getAssignmentId supervisorData) + (int port) + workerId)] +(ConfigUtils/setWorkerUserWSE conf workerId "") +(ProcessSimulator/registerProcess pid worker) +(.put (.getWorkerThreadPids supervisorData) workerId pid) +)) +(defn shutdown-local-worker [supervisorData worker-manager workerId] + (log-message "shutdown-local-worker") + (let [supervisor-id (.getSupervisorId supervisorData) +worker-pids (.getWorkerThreadPids supervisorData) +dead-workers (.getDeadWorkers supervisorData)] +(.shutdownWorker worker-manager supervisor-id workerId worker-pids) --- End diff -- I understand where the code went, it is just that the worker-manager abstraction is not as clean as I expected, but that can wait for refactoring later on when we try to make it more java like. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217995#comment-15217995 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57892242 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,427 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +MapassignedExecutors = localState.getLocalAssignmentsMap(); + +if (assignedExecutors == null) { +assignedExecutors = new HashMap<>(); +} +int now = Time.currentTimeSecs(); + +Map localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); + +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217983#comment-15217983 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57890544 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process processLauncher(Map conf, String user, List commandPrefix, List args, Mapenvironment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling processLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +if (commandPrefix != null){ +commands.addAll(commandPrefix); +} +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int processLauncherAndWait(Map conf, String user, List args, final Map environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217982#comment-15217982 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57890242 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process processLauncher(Map conf, String user, List commandPrefix, List args, Mapenvironment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling processLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +if (commandPrefix != null){ +commands.addAll(commandPrefix); +} +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int processLauncherAndWait(Map conf, String user, List args, final Map environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217968#comment-15217968 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57887632 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java --- @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManager implements SupervisorDaemon, DaemonCommon, Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorManager.class); +private final EventManager eventManager; +private final EventManager processesEventManager; +private SupervisorData supervisorData; + +public SupervisorManager(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) { +this.eventManager = eventManager; +this.supervisorData = supervisorData; +this.processesEventManager = processesEventManager; +} + +public void shutdown() { +LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); --- End diff -- minor need a space between `supervisor` and `{}` > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217964#comment-15217964 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57887492 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java --- @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManager implements SupervisorDaemon, DaemonCommon, Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorManager.class); +private final EventManager eventManager; +private final EventManager processesEventManager; +private SupervisorData supervisorData; --- End diff -- This could be final too. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217966#comment-15217966 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57887620 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); --- End diff -- here stormIdToProfilerActions contain all storm topology, suggest put it later and filter some topology, only contain the topology which will run on current supervisor. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example >
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217961#comment-15217961 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57887233 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.apache.zookeeper.data.ACL; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class SupervisorData { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class); + +private final Map conf; +private final IContext sharedContext; +private volatile boolean active; +private ISupervisor iSupervisor; +private Utils.UptimeComputer upTime; +private String stormVersion; +private ConcurrentHashMapworkerThreadPids; // for local mode +private IStormClusterState stormClusterState; +private LocalState localState; +private String supervisorId; +private String assignmentId; +private String hostName; +// used for reporting used ports when heartbeating +private AtomicReference
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217952#comment-15217952 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57886321 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.apache.zookeeper.data.ACL; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class SupervisorData { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class); + +private final Map conf; +private final IContext sharedContext; +private volatile boolean active; +private ISupervisor iSupervisor; +private Utils.UptimeComputer upTime; +private String stormVersion; +private ConcurrentHashMapworkerThreadPids; // for local mode +private IStormClusterState stormClusterState; +private LocalState localState; +private String supervisorId; +private String assignmentId; +private String hostName; +// used for reporting used ports when heartbeating +private AtomicReference > currAssignment; +private StormTimer heartbeatTimer; +private StormTimer eventTimer; +private StormTimer blobUpdateTimer; +private Localizer localizer; +private AtomicReference >> assignmentVersions; +private AtomicInteger syncRetry; +private final Object downloadLock = new Object(); +private AtomicReference > stormIdToProfileActions; +private ConcurrentHashSet deadWorkers; +private final IWorkerManager workerManager; + +public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) { +this.conf = conf; +this.sharedContext = sharedContext; +this.iSupervisor = iSupervisor; +this.active = true; +this.upTime = Utils.makeUptimeComputer(); +this.stormVersion = VersionInfo.getVersion(); +this.workerThreadPids = new ConcurrentHashMap (); +this.deadWorkers = new ConcurrentHashSet(); + +List acls = null; +if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { +acls = SupervisorUtils.supervisorZkAcls(); +} + +try { +this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR)); +} catch (Exception e) { +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217929#comment-15217929 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57883592 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java --- @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.command.HealthCheck; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorHealthCheck implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class); + +private SupervisorData supervisorData; + +public SupervisorHealthCheck(SupervisorData supervisorData) { +this.supervisorData = supervisorData; +} + +@Override +public void run() { +Map conf = supervisorData.getConf(); +IWorkerManager workerManager = supervisorData.getWorkerManager(); +int healthCode = HealthCheck.healthCheck(conf); +Collection workerIds = SupervisorUtils.supervisorWorkerIds(conf); +if (healthCode != 0) { +for (String workerId : workerIds) { --- End diff -- Here supervisor kill all workers, I prefer to call SupervisorManager.shutdownAllWorkers > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217906#comment-15217906 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57880994 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java --- @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SupervisorHeartbeat implements Runnable { + + private final IStormClusterState stormClusterState; + private final String supervisorId; + private final Map conf; + private final SupervisorData supervisorData; + +public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { +this.stormClusterState = supervisorData.getStormClusterState(); +this.supervisorId = supervisorData.getSupervisorId(); +this.supervisorData = supervisorData; +this.conf = conf; +} + +private SupervisorInfo buildSupervisorInfo(Map conf, SupervisorData supervisorData) { +SupervisorInfo supervisorInfo = new SupervisorInfo(); +supervisorInfo.set_time_secs(Time.currentTimeSecs()); +supervisorInfo.set_hostname(supervisorData.getHostName()); +supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); + +List usedPorts = new ArrayList<>(); + usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet()); +supervisorInfo.set_used_ports(usedPorts); +List metaDatas = (List)supervisorData.getiSupervisor().getMetadata(); +List portList = new ArrayList<>(); +if (metaDatas != null){ +for (Object data : metaDatas){ +Integer port = Utils.getInt(data); +if (port != null) +portList.add(port.longValue()); +} +} + +supervisorInfo.set_meta(portList); +supervisorInfo.set_scheduler_meta((Map) conf.get(Config.SUPERVISOR_SCHEDULER_META)); + supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); +supervisorInfo.set_version(supervisorData.getStormVersion()); +supervisorInfo.set_resources_map(mkSupervisorCapacities(conf)); +return supervisorInfo; +} + +private Map mkSupervisorCapacities(Map conf) { +Map ret = new HashMap (); +Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)); +ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); +Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY)); +ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); +return ret; +} --- End diff -- Current, this method to get capacity from configuration is fine to me. But in some case, I prefer to adding one interface to dynamic check system resource. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels:
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217900#comment-15217900 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57880519 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process processLauncher(Map conf, String user, List commandPrefix, List args, Mapenvironment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling processLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +if (commandPrefix != null){ +commands.addAll(commandPrefix); +} +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int processLauncherAndWait(Map conf, String user, List args, final Map environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217893#comment-15217893 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57879928 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java --- @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +public class StandaloneSupervisor implements ISupervisor { +private String supervisorId; +private Map conf; + +@Override +public void prepare(Map stormConf, String schedulerLocalDir) { +try { +LocalState localState = new LocalState(schedulerLocalDir); +String supervisorId = localState.getSupervisorId(); +if (supervisorId == null) { +supervisorId = generateSupervisorId(); +localState.setSupervisorId(supervisorId); +} +this.conf = stormConf; +this.supervisorId = supervisorId; +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +@Override +public String getSupervisorId() { +return supervisorId; +} + +@Override +public String getAssignmentId() { --- End diff -- In the old storm version before 0.9.0, there is only supervisorId, no assignmentId, @revans2 Do you know why we add it? > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217801#comment-15217801 ] ASF GitHub Bot commented on STORM-1279: --- Github user longdafeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57870832 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java --- @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { --- End diff -- There are one defaultUncaughtExceptionHandler in Utils.setupDefaultUncaughtExceptionHandler, I suggest 1. put this class into utility package 2. merge with Utils.setupDefaultUncaughtExceptionHandler, this class will be the storm's default UncaughtExceptionHandler > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217702#comment-15217702 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-203337583 @revans2 ok, I will file the JIRAs about the local_supervisor.clj and "remove localSyncProcess" after porting worker.clj. I am not sure whether someone is using the getAssignmentId(), although I agree with @zhuoliu . > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217234#comment-15217234 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57828359 --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj --- @@ -0,0 +1,64 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] +[org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) + +(defn launch-local-worker [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + pid (Utils/uuid) +worker (worker/mk-worker conf + (.getSharedContext supervisorData) + stormId + (.getAssignmentId supervisorData) + (int port) + workerId)] +(ConfigUtils/setWorkerUserWSE conf workerId "") +(ProcessSimulator/registerProcess pid worker) +(.put (.getWorkerThreadPids supervisorData) workerId pid) +)) +(defn shutdown-local-worker [supervisorData worker-manager workerId] + (log-message "shutdown-local-worker") + (let [supervisor-id (.getSupervisorId supervisorData) +worker-pids (.getWorkerThreadPids supervisorData) +dead-workers (.getDeadWorkers supervisorData)] +(.shutdownWorker worker-manager supervisor-id workerId worker-pids) --- End diff -- In the old code when killing a worker we would call `supervisor/shutdown-worker`, it will perform some acitons, and call `ProcessSimulator/killProcess` & `supervisor/try-cleanup-worker`. Now we move `try-cleanup-worker` to the worker-manager, it is called `cleanupWorker` . So the `shutdown-local-worker` is similar to the `supervisor/shutddown-worker`. Why do we override the `shutdown-local-worker`. Maybe I can't explain it clearly. You can refer to testing.clj : ``` (defmacro capture-changed-workers [& body] `(let [launch-captured# (atom {}) shutdown-captured# (atom {})] (with-var-roots [local-supervisor/launch-local-worker (mk-capture-launch-fn launch-captured#) local-supervisor/shutdown-local-worker (mk-capture-shutdown-fn shutdown-captured#)] ~@body {:launched @launch-captured# :shutdown @shutdown-captured#}))) ``` > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216821#comment-15216821 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57799337 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class Supervisor { +private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); + +// TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor --- End diff -- Please file a JIRA instead of having the TODO in the code. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216818#comment-15216818 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57798869 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class Supervisor { +private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); --- End diff -- final this should be imutable > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216817#comment-15216817 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57798829 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java --- @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.storm.generated.LSWorkerHeartbeat; + +public class StateHeartbeat { +private State state; +private LSWorkerHeartbeat hb; --- End diff -- Lats mark these as final because they are imutable > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216816#comment-15216816 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57798519 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java --- @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +public class StandaloneSupervisor implements ISupervisor { +private String supervisorId; +private Map conf; + +@Override +public void prepare(Map stormConf, String schedulerLocalDir) { +try { +LocalState localState = new LocalState(schedulerLocalDir); +String supervisorId = localState.getSupervisorId(); +if (supervisorId == null) { +supervisorId = generateSupervisorId(); +localState.setSupervisorId(supervisorId); +} +this.conf = stormConf; +this.supervisorId = supervisorId; +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +@Override +public String getSupervisorId() { +return supervisorId; +} + +@Override +public String getAssignmentId() { --- End diff -- The problem is that this is a public API. I am OK with changing it, but I would want to have a really good reason to do so. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216805#comment-15216805 ] ASF GitHub Bot commented on STORM-1279: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57797438 --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj --- @@ -0,0 +1,64 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] +[org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) + +(defn launch-local-worker [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + pid (Utils/uuid) +worker (worker/mk-worker conf + (.getSharedContext supervisorData) + stormId + (.getAssignmentId supervisorData) + (int port) + workerId)] +(ConfigUtils/setWorkerUserWSE conf workerId "") +(ProcessSimulator/registerProcess pid worker) +(.put (.getWorkerThreadPids supervisorData) workerId pid) +)) +(defn shutdown-local-worker [supervisorData worker-manager workerId] + (log-message "shutdown-local-worker") + (let [supervisor-id (.getSupervisorId supervisorData) +worker-pids (.getWorkerThreadPids supervisorData) +dead-workers (.getDeadWorkers supervisorData)] +(.shutdownWorker worker-manager supervisor-id workerId worker-pids) --- End diff -- A bit confused. When launching a worker we call `ProcessSimulator/registerProcess`, and in the old code when killing a worker we would call `ProcessSimulator/killProcess` but now we call `shutdownWorker` and `cleanupWorker` from the worker-manager. Why is the abstraction split like this? > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213805#comment-15213805 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57544806 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java --- @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +public class StandaloneSupervisor implements ISupervisor { +private String supervisorId; +private Map conf; + +@Override +public void prepare(Map stormConf, String schedulerLocalDir) { +try { +LocalState localState = new LocalState(schedulerLocalDir); +String supervisorId = localState.getSupervisorId(); +if (supervisorId == null) { +supervisorId = generateSupervisorId(); +localState.setSupervisorId(supervisorId); +} +this.conf = stormConf; +this.supervisorId = supervisorId; +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +@Override +public String getSupervisorId() { +return supervisorId; +} + +@Override +public String getAssignmentId() { --- End diff -- I agree with you > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211989#comment-15211989 ] ASF GitHub Bot commented on STORM-1279: --- Github user zhuoliu commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57458382 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java --- @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +public class StandaloneSupervisor implements ISupervisor { +private String supervisorId; +private Map conf; + +@Override +public void prepare(Map stormConf, String schedulerLocalDir) { +try { +LocalState localState = new LocalState(schedulerLocalDir); +String supervisorId = localState.getSupervisorId(); +if (supervisorId == null) { +supervisorId = generateSupervisorId(); +localState.setSupervisorId(supervisorId); +} +this.conf = stormConf; +this.supervisorId = supervisorId; +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +@Override +public String getSupervisorId() { +return supervisorId; +} + +@Override +public String getAssignmentId() { --- End diff -- Since assignmentId is in fact the same as supervisorId. Should we just refactor and eliminate the assignmentId in the java version? Also want to hear from @revans2 @d2r on this. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211968#comment-15211968 ] ASF GitHub Bot commented on STORM-1279: --- Github user zhuoliu commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57457270 --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj --- @@ -0,0 +1,64 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] +[org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) + +(defn launch-local-worker [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + pid (Utils/uuid) +worker (worker/mk-worker conf + (.getSharedContext supervisorData) + stormId + (.getAssignmentId supervisorData) + (int port) + workerId)] +(ConfigUtils/setWorkerUserWSE conf workerId "") +(ProcessSimulator/registerProcess pid worker) +(.put (.getWorkerThreadPids supervisorData) workerId pid) +)) --- End diff -- newline, minor > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211460#comment-15211460 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-201153034 @jerrypeng @revans2 can you look at it again? Thank you very much! > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211459#comment-15211459 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-201152514 Sorry I deleted the old branch about "supervisor" without attention, So I reopen the PR about supervisor. The PR is the same as before. If want look at thee old comments , you can link the 1184(https://github.com/apache/storm/pull/1184). > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211456#comment-15211456 ] ASF GitHub Bot commented on STORM-1279: --- GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1257 [STORM-1279] port backtype.storm.daemon.supervisor to java 1 port supervisor.clj to java; 2 Update all the callings to supervisor; 3 Supervisor's class hierarchy as follows: SupervisorManger is supervisor' manger which can clean and shutdown supervisor; SyncSupervisorEvent is responsible for downloading/removing assignments and topologys' files; SyncProcessEvent is responsible for starting/killing workers; SupervisorUtils have common methods; 4 create local-supervisor.clj for local mode. 5. fix the fails on supervisor test in Windows You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm supervisor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1257.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1257 commit 08934e29982d3936c9e247a8d7bac563053f869f Author: xiaojian.fxjDate: 2016-02-26T04:38:23Z port Supervisor to java commit b281c735f0089d24407af67586a1b41de45ac382 Author: xiaojian.fxj Date: 2016-02-26T05:15:56Z update supervisor's structure commit 19fcafbd0fe1cbee49e797824c47ba1f6b727270 Author: xiaojian.fxj Date: 2016-03-02T01:00:37Z update test codes about supervisor commit b09b4129d845aff6be285ea1748b842499c40e0b Author: xiaojian.fxj Date: 2016-03-04T04:14:41Z Merge branch 'master' into supervisor commit 42bacde20ea86867b874395532aa034cfad4f120 Author: xiaojian.fxj Date: 2016-03-06T08:05:14Z Merge branch 'master' into supervisor commit 465a4b89521a4ac15b81969009133bdfa12d0655 Author: xiaojian.fxj Date: 2016-03-10T12:12:18Z commit 184dc4a5c3fa8c9662ab224a82f33cc687b95c4b Author: xiaojian.fxj Date: 2016-03-10T14:17:06Z sdf commit 65ce9d2e03be5f5c4defa8342bfbefe9f59adcf9 Author: xiaojian.fxj Date: 2016-03-10T14:57:01Z Merge branch 'master' into supervisor commit f78c36d7cc9ca82c6aa4e073f07279650a14fd45 Author: xiaojian.fxj Date: 2016-03-10T15:20:33Z remove setLocalizer commit 69c8b3c31d4ee528aea58f716b092c24ba6b0b1a Author: xiaojian.fxj Date: 2016-03-10T15:26:42Z Merge branch 'master' into supervisor commit 95bf67347cad7c11aeaf55b7588e627be298d1c2 Author: xiaojian.fxj Date: 2016-03-10T15:49:52Z resolve conflict when merge with master commit cc95d4f708efa123e5fc908bea15545f7139655b Author: xiaojian.fxj Date: 2016-03-11T00:03:00Z sdf commit a1e473526b5d9074ae1f9ff98162ddc78e426a73 Author: xiaojian.fxj Date: 2016-03-14T08:54:36Z add the plugin to use for manager worker commit b49c99541ae9c2c3f86d9823c64d30765f7716c6 Author: xiaojian.fxj Date: 2016-03-14T10:56:59Z Merge branch 'master' into supervisor commit 42928c2182cf2b755c6f98ad039b2e858787dfe4 Author: xiaojian.fxj Date: 2016-03-14T16:16:19Z start worker successfully commit 56f27e5d58d7abd1bdd9aff95dfb862540b166ef Author: xiaojian.fxj Date: 2016-03-16T06:02:10Z Merge branch 'master' of github.com:apache/storm commit d63167cc4a13289ef46b5fa1650621c57b191d3b Author: xiaojian.fxj Date: 2016-03-17T01:29:54Z Merge branch 'master' of github.com:apache/storm commit 2e2ffb29df039e9339e7b2b44352c744efb5caf0 Author: xiaojian.fxj Date: 2016-03-18T13:16:44Z Merge branch 'master' of github.com:apache/storm commit 28867372a4fc96d744ccd00a27d9e38dab2bd49e Author: xiaojian.fxj Date: 2016-03-23T03:10:08Z Merge branch 'master' of github.com:apache/storm commit f03b8bec105e88282211bf3e7dd4be4aeed484d8 Author: xiaojian.fxj Date: 2016-03-23T05:53:00Z Merge branch 'master' into supervisor and update supervisor based STORM-1631 commit 724f5d2cea8debea8c6fb6a0d42d275880636834 Author: xiaojian.fxj Date: 2016-03-23T17:29:20Z update commit 0100898ce9006cedd66c61b082001d1d455e5199 Author: xiaojian.fxj Date: 2016-03-25T01:09:53Z Merge
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211452#comment-15211452 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj closed the pull request at: https://github.com/apache/storm/pull/1184 > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211332#comment-15211332 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57414646 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15209079#comment-15209079 ] ASF GitHub Bot commented on STORM-1279: --- Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57228512 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208733#comment-15208733 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57193403 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208645#comment-15208645 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57185550 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +MapassignedExecutors = localState.getLocalAssignmentsMap(); + +if (assignedExecutors == null) { +assignedExecutors = new HashMap<>(); +} +int now = Time.currentTimeSecs(); + +Map
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208619#comment-15208619 ] ASF GitHub Bot commented on STORM-1279: --- Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57182643 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +MapassignedExecutors = localState.getLocalAssignmentsMap(); + +if (assignedExecutors == null) { +assignedExecutors = new HashMap<>(); +} +int now = Time.currentTimeSecs(); + +Map
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208613#comment-15208613 ] ASF GitHub Bot commented on STORM-1279: --- Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57181897 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +MapassignedExecutors = localState.getLocalAssignmentsMap(); + +if (assignedExecutors == null) { +assignedExecutors = new HashMap<>(); +} +int now = Time.currentTimeSecs(); + +Map
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208525#comment-15208525 ] ASF GitHub Bot commented on STORM-1279: --- Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57171985 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15207980#comment-15207980 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-200214770 @jerrypeng I have merge your bug fix.Thank you. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198734#comment-15198734 ] ASF GitHub Bot commented on STORM-1279: --- Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-197701084 Found another bug in supervisor: https://github.com/apache/storm/pull/1226 just another note here to make sure we remember to get in the fix whether in clojure or java > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196778#comment-15196778 ] ASF GitHub Bot commented on STORM-1279: --- Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-197155837 @jerrypeng Of course > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196341#comment-15196341 ] ASF GitHub Bot commented on STORM-1279: --- Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-197042280 I have a bug fix for cgroups that involve a couple lines of modification in supervisor.clj. https://github.com/apache/storm/pull/1216 If that pull request gets merged first please remember to also implement that change here. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192876#comment-15192876 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55965454 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,631 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192864#comment-15192864 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55964084 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192863#comment-15192863 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55963987 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192858#comment-15192858 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55963334 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192518#comment-15192518 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55942961 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process workerLauncher(Map conf, String user, List args, Mapenvironment, final String logPreFix, +final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling workerLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int workerLauncherAndWait(Map conf, String user, List args, final Map environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +commands.add("code-dir"); +commands.add(dir); +workerLauncherAndWait(conf, (String)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192514#comment-15192514 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55942826 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192512#comment-15192512 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55942812 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192513#comment-15192513 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55942818 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192400#comment-15192400 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55939157 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192396#comment-15192396 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55939032 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192395#comment-15192395 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55939026 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192382#comment-15192382 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938368 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192381#comment-15192381 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938351 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java --- End diff -- is this a valid to-do? > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192379#comment-15192379 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938337 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** --- End diff -- same comment is at the class level also which can be removed. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger >
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192377#comment-15192377 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938211 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process workerLauncher(Map conf, String user, List args, Mapenvironment, final String logPreFix, +final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling workerLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int workerLauncherAndWait(Map conf, String user, List args, final Map environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +commands.add("code-dir"); +commands.add(dir); +workerLauncherAndWait(conf, (String)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192376#comment-15192376 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938186 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process workerLauncher(Map conf, String user, List args, Mapenvironment, final String logPreFix, +final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling workerLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int workerLauncherAndWait(Map conf, String user, List args, final Map environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +commands.add("code-dir"); +commands.add(dir); +workerLauncherAndWait(conf, (String)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192374#comment-15192374 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938170 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class Supervisor { +private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); + +// TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor +private SyncProcessEvent localSyncProcess; + +public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) { +this.localSyncProcess = localSyncProcess; +} + +/** + * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary + * + * @param conf + * @param sharedContext + * @param iSupervisor + * @return + * @throws Exception + */ +public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { +SupervisorManger supervisorManger = null; +try { +LOG.info("Starting Supervisor with conf {}", conf); +iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); +String path = ConfigUtils.supervisorTmpDir(conf); +FileUtils.cleanDirectory(new File(path)); + +final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); +Localizer localizer = supervisorData.getLocalizer(); + +SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); +hb.run(); +// should synchronize supervisor so it doesn't launch anything after being down (optimization) +Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); +supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); + +Set downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf); --- End diff -- nit. downloadedStormIds > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration,
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192372#comment-15192372 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938152 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process workerLauncher(Map conf, String user, List args, Mapenvironment, final String logPreFix, +final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling workerLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int workerLauncherAndWait(Map conf, String user, List args, final Map environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +commands.add("code-dir"); +commands.add(dir); +workerLauncherAndWait(conf, (String)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192370#comment-15192370 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938035 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class Supervisor { +private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); + +// TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor +private SyncProcessEvent localSyncProcess; + +public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) { +this.localSyncProcess = localSyncProcess; +} + +/** + * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary + * + * @param conf + * @param sharedContext + * @param iSupervisor + * @return + * @throws Exception + */ +public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { +SupervisorManger supervisorManger = null; +try { +LOG.info("Starting Supervisor with conf {}", conf); +iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); +String path = ConfigUtils.supervisorTmpDir(conf); +FileUtils.cleanDirectory(new File(path)); + +final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); +Localizer localizer = supervisorData.getLocalizer(); + +SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); +hb.run(); +// should synchronize supervisor so it doesn't launch anything after being down (optimization) +Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); +supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); + +Set downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf); +for (String stormId : downdedStormId) { +SupervisorUtils.addBlobReferences(localizer, stormId, conf); +} +// do this after adding the references so we don't try to clean things being used +localizer.startCleaner(); + +EventManagerImp syncSupEventManager = new EventManagerImp(false); +EventManagerImp syncProcessManager = new EventManagerImp(false); + +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192368#comment-15192368 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55938008 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class); +private final EventManager eventManager; +private final EventManager processesEventManager; +private SupervisorData supervisorData; + +public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) { +this.eventManager = eventManager; +this.supervisorData = supervisorData; +this.processesEventManager = processesEventManager; +} + +public void shutdown() { +LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); +supervisorData.setActive(false); +try { +supervisorData.getHeartbeatTimer().close(); +supervisorData.getEventTimer().close(); +supervisorData.getBlobUpdateTimer().close(); +eventManager.close(); +processesEventManager.close(); +} catch (Exception e) { +throw Utils.wrapInRuntime(e); +} +supervisorData.getStormClusterState().disconnect(); +} + +@Override +public void shutdownAllWorkers() { + +Collection workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf()); +try { +for (String workerId : workerIds) { +SupervisorUtils.shutWorker(supervisorData, workerId); +} +} catch (Exception e) { +LOG.error("shutWorker failed"); +throw Utils.wrapInRuntime(e); +} +} + +@Override +public Map getConf() { +return supervisorData.getConf(); +} + +@Override +public String getId() { +return supervisorData.getSupervisorId(); +} + +@Override +public boolean isWaiting() { +if (!supervisorData.isActive()) { +return true; +} + +if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting() +&& processesEventManager.waiting()) { +return true; +} +return false; +} + +public void run() { --- End diff -- it is not very intuitive that SupervisorManager is also a shutdown hook. You can remove this from here and in the use an anonymous shutdown hook > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example >
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192367#comment-15192367 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55937985 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable { --- End diff -- nit. class name should be SupervisorManager. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192365#comment-15192365 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55937914 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class); +private final EventManager eventManager; +private final EventManager processesEventManager; +private SupervisorData supervisorData; + +public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) { +this.eventManager = eventManager; +this.supervisorData = supervisorData; +this.processesEventManager = processesEventManager; +} + +public void shutdown() { +LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); --- End diff -- minor. space after supervisor. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192361#comment-15192361 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55937791 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class Supervisor { +private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); + +// TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor +private SyncProcessEvent localSyncProcess; + +public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) { +this.localSyncProcess = localSyncProcess; +} + +/** + * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary + * + * @param conf + * @param sharedContext + * @param iSupervisor + * @return + * @throws Exception + */ +public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { +SupervisorManger supervisorManger = null; +try { +LOG.info("Starting Supervisor with conf {}", conf); +iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); +String path = ConfigUtils.supervisorTmpDir(conf); +FileUtils.cleanDirectory(new File(path)); + +final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); +Localizer localizer = supervisorData.getLocalizer(); + +SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); +hb.run(); +// should synchronize supervisor so it doesn't launch anything after being down (optimization) +Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); +supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); + +Set downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf); +for (String stormId : downdedStormId) { +SupervisorUtils.addBlobReferences(localizer, stormId, conf); +} +// do this after adding the references so we don't try to clean things being used +localizer.startCleaner(); + +EventManagerImp syncSupEventManager = new EventManagerImp(false); +EventManagerImp syncProcessManager = new EventManagerImp(false); + +
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192354#comment-15192354 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55937586 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java --- @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SupervisorHeartbeat implements Runnable { --- End diff -- does it need to be Runnable? > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192355#comment-15192355 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55937596 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java --- @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SupervisorHeartbeat implements Runnable { + + private final IStormClusterState stormClusterState; + private final String supervisorId; + private final Map conf; + private final SupervisorData supervisorData; + +public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { +this.stormClusterState = supervisorData.getStormClusterState(); +this.supervisorId = supervisorData.getSupervisorId(); +this.supervisorData = supervisorData; +this.conf = conf; +} + +private SupervisorInfo update(Map conf, SupervisorData supervisorData) { --- End diff -- can this be renamed to buildSupervisorInfo or constructSupervisorInfo? > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192351#comment-15192351 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55937502 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java --- @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +public class StandaloneSupervisor implements ISupervisor { +private String supervisorId; +private Map conf; + +@Override +public void prepare(Map stormConf, String schedulerLocalDir) { +try { +LocalState localState = new LocalState(schedulerLocalDir); +String supervisorId = localState.getSupervisorId(); +if (supervisorId == null) { +supervisorId = generateSupervisorId(); +localState.setSupervisorId(supervisorId); +} +this.conf = stormConf; +this.supervisorId = supervisorId; +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +@Override +public String getSupervisorId() { +return supervisorId; +} + +@Override +public String getAssignmentId() { +return supervisorId; +} + +@Override +// @return is vector which need be converted to be int --- End diff -- this comment can be removed > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java
[ https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15189768#comment-15189768 ] ASF GitHub Bot commented on STORM-1279: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55732854 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.utils.PathUtils; +import org.apache.storm.Config; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; + +public static void setInstance(SupervisorUtils u) { +_instance = u; +} + +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process workerLauncher(Map conf, String user, List args, Mapenvironment, final String logPreFix, --- End diff -- Thank you for the clarification. runAsUser sounds like a good name for this API. It is also possible to always pass user in the API signatures. If SUPERVISOR_WORKER_LAUNCHER is not enabled, the user argument will be local user and ignored in the implementation. Otherwise, commands will be launched with that user's permission. The implementation can be prepared when supervisor starts up. > port backtype.storm.daemon.supervisor to java > - > > Key: STORM-1279 > URL: https://issues.apache.org/jira/browse/STORM-1279 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: John Fang > Labels: java-migration, jstorm-merger > Attachments: Discussion about supervisor.pdf > > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor > as an example > backtype.storm.event usage should be replaced with built-in java threadpools. -- This message was sent by Atlassian JIRA (v6.3.4#6332)