STORM-2018: Supervisor V2
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5a320461 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5a320461 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5a320461 Branch: refs/heads/master Commit: 5a3204610734871f18ba38a29b271cfb814ff1bc Parents: 4ce6f04 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Fri Aug 5 16:15:21 2016 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Mon Sep 19 15:57:57 2016 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 4 - log4j2/cluster.xml | 2 +- .../apache/storm/daemon/local_supervisor.clj | 48 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 5 +- .../src/clj/org/apache/storm/daemon/worker.clj | 9 +- storm-core/src/clj/org/apache/storm/testing.clj | 53 +- storm-core/src/jvm/org/apache/storm/Config.java | 17 +- .../org/apache/storm/cluster/IStateStorage.java | 9 +- .../storm/cluster/IStormClusterState.java | 2 +- .../storm/cluster/PaceMakerStateStorage.java | 2 +- .../storm/cluster/StormClusterStateImpl.java | 18 +- .../org/apache/storm/cluster/VersionedData.java | 36 + .../apache/storm/cluster/ZKStateStorage.java | 2 +- .../org/apache/storm/command/KillWorkers.java | 33 +- .../container/ResourceIsolationInterface.java | 21 +- .../storm/container/cgroup/CgroupCommon.java | 6 +- .../container/cgroup/CgroupCommonOperation.java | 2 +- .../storm/container/cgroup/CgroupManager.java | 37 +- .../storm/daemon/supervisor/AdvancedFSOps.java | 335 +++++++ .../storm/daemon/supervisor/BasicContainer.java | 658 +++++++++++++ .../supervisor/BasicContainerLauncher.java | 62 ++ .../storm/daemon/supervisor/Container.java | 549 +++++++++++ .../daemon/supervisor/ContainerLauncher.java | 104 +++ .../supervisor/ContainerRecoveryException.java | 29 + .../daemon/supervisor/ExitCodeCallback.java | 30 + .../storm/daemon/supervisor/Killable.java | 50 + .../storm/daemon/supervisor/LocalContainer.java | 85 ++ .../supervisor/LocalContainerLauncher.java | 60 ++ .../daemon/supervisor/ReadClusterState.java | 327 +++++++ .../daemon/supervisor/RunAsUserContainer.java | 100 ++ .../supervisor/RunAsUserContainerLauncher.java | 60 ++ .../apache/storm/daemon/supervisor/Slot.java | 785 ++++++++++++++++ .../apache/storm/daemon/supervisor/State.java | 22 - .../storm/daemon/supervisor/StateHeartbeat.java | 45 - .../storm/daemon/supervisor/Supervisor.java | 362 ++++++-- .../daemon/supervisor/SupervisorDaemon.java | 28 - .../storm/daemon/supervisor/SupervisorData.java | 234 ----- .../daemon/supervisor/SupervisorManager.java | 92 -- .../daemon/supervisor/SupervisorUtils.java | 170 ++-- .../daemon/supervisor/SyncProcessEvent.java | 448 --------- .../daemon/supervisor/SyncSupervisorEvent.java | 612 ------------ .../supervisor/timer/RunProfilerActions.java | 211 ----- .../supervisor/timer/SupervisorHealthCheck.java | 27 +- .../supervisor/timer/SupervisorHeartbeat.java | 30 +- .../daemon/supervisor/timer/UpdateBlobs.java | 16 +- .../workermanager/DefaultWorkerManager.java | 429 --------- .../workermanager/IWorkerManager.java | 35 - .../apache/storm/localizer/AsyncLocalizer.java | 432 +++++++++ .../org/apache/storm/localizer/ILocalizer.java | 70 ++ .../localizer/LocalDownloadedResource.java | 146 +++ .../LocalizedResourceRetentionSet.java | 2 +- .../storm/localizer/LocalizedResourceSet.java | 8 +- .../org/apache/storm/localizer/Localizer.java | 38 +- .../jvm/org/apache/storm/task/ShellBolt.java | 7 +- .../org/apache/storm/testing/FeederSpout.java | 29 +- .../apache/storm/trident/util/TridentUtils.java | 2 +- .../jvm/org/apache/storm/utils/ConfigUtils.java | 86 +- .../org/apache/storm/utils/InprocMessaging.java | 69 +- .../src/jvm/org/apache/storm/utils/Time.java | 26 +- .../src/jvm/org/apache/storm/utils/Utils.java | 219 +---- .../org/apache/storm/zookeeper/Zookeeper.java | 17 +- .../org/apache/storm/integration_test.clj | 67 -- .../test/clj/org/apache/storm/metrics_test.clj | 4 +- .../test/clj/org/apache/storm/nimbus_test.clj | 10 +- .../storm/security/auth/drpc_auth_test.clj | 36 +- .../clj/org/apache/storm/supervisor_test.clj | 926 ------------------- .../test/jvm/org/apache/storm/TestCgroups.java | 3 +- .../daemon/supervisor/BasicContainerTest.java | 484 ++++++++++ .../storm/daemon/supervisor/ContainerTest.java | 269 ++++++ .../storm/daemon/supervisor/SlotTest.java | 515 +++++++++++ .../storm/executor/error/ReportErrorTest.java | 76 ++ .../storm/localizer/AsyncLocalizerTest.java | 187 ++++ .../LocalizedResourceRetentionSetTest.java | 10 +- .../localizer/LocalizedResourceSetTest.java | 12 +- 74 files changed, 6163 insertions(+), 3888 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 8392936..2a6f18d 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -296,10 +296,6 @@ storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManage # If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run storm.resource.isolation.plugin.enable: false - -# Default plugin to use for manager worker -storm.supervisor.worker.manager.plugin: org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager - # Configs for CGroup support storm.cgroup.hierarchy.dir: "/cgroup/storm_resources" storm.cgroup.resources: http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/log4j2/cluster.xml ---------------------------------------------------------------------- diff --git a/log4j2/cluster.xml b/log4j2/cluster.xml index e911823..a0ba3d1 100644 --- a/log4j2/cluster.xml +++ b/log4j2/cluster.xml @@ -18,7 +18,7 @@ <configuration monitorInterval="60"> <properties> - <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} [%p] %msg%n</property> + <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %t %c{1.} [%p] %msg%n</property> </properties> <appenders> <RollingFile name="A1" immediateFlush="false" http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj index 560ae3e..a02a8e2 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj @@ -14,51 +14,15 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.daemon.local-supervisor - (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] - [org.apache.storm.utils Utils ConfigUtils] - [org.apache.storm ProcessSimulator]) - (:use [org.apache.storm.daemon common] - [org.apache.storm log]) - (:require [org.apache.storm.daemon [worker :as worker] ]) - (:require [clojure.string :as str]) + (:import [org.apache.storm.daemon.supervisor Supervisor] + [org.apache.storm.utils ConfigUtils]) + (:use [org.apache.storm.daemon common]) (:gen-class)) -(defn launch-local-worker [supervisorData stormId port workerId resources] - (let [conf (.getConf supervisorData) - pid (Utils/uuid) - worker (worker/mk-worker conf - (.getSharedContext supervisorData) - stormId - (.getAssignmentId supervisorData) - (int port) - workerId)] - (ConfigUtils/setWorkerUserWSE conf workerId "") - (ProcessSimulator/registerProcess pid worker) - (.put (.getWorkerThreadPids supervisorData) workerId pid))) - -(defn shutdown-local-worker [supervisorData worker-manager workerId] - (log-message "shutdown-local-worker") - (let [supervisor-id (.getSupervisorId supervisorData) - worker-pids (.getWorkerThreadPids supervisorData) - dead-workers (.getDeadWorkers supervisorData)] - (.shutdownWorker worker-manager supervisor-id workerId worker-pids) - (if (.cleanupWorker worker-manager workerId) - (.remove dead-workers workerId)))) - -(defn local-process [] - "Create a local process event" - (proxy [SyncProcessEvent] [] - (launchLocalWorker [supervisorData stormId port workerId resources] - (launch-local-worker supervisorData stormId port workerId resources)) - (killWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId)))) - - (defserverfn mk-local-supervisor [conf shared-context isupervisor] - (log-message "Starting local Supervisor with conf " conf) (if (not (ConfigUtils/isLocalMode conf)) (throw (IllegalArgumentException. "Cannot start server in distrubuted mode!"))) - (let [local-process (local-process) - supervisor-server (Supervisor.)] - (.setLocalSyncProcess supervisor-server local-process) - (.mkSupervisor supervisor-server conf shared-context isupervisor))) \ No newline at end of file + (let [supervisor-server (Supervisor. conf shared-context isupervisor)] + (.launch supervisor-server) + supervisor-server)) http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 76b3917..1a83c0f 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -1616,7 +1616,7 @@ :all-components all-components :launch-time-secs launch-time-secs :assignment assignment - :beats beats + :beats (or beats {}) :topology topology :task->component task->component :base base})) @@ -1995,7 +1995,8 @@ executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)] (let [host (-> assignment :node->host (get node)) heartbeat (.get beats (StatsUtil/convertExecutor executor)) - hb (if heartbeat (.get heartbeat "heartbeat")) + heartbeat (or heartbeat {}) + hb (.get heartbeat "heartbeat") excutorstats (if hb (.get hb "stats")) excutorstats (if excutorstats (StatsUtil/thriftifyExecutorStats excutorstats))] http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 222de36..6b165b2 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -36,13 +36,14 @@ (:import [org.apache.storm.messaging TransportFactory]) (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback]) (:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon]) + (:import [org.apache.storm.daemon.supervisor AdvancedFSOps]) (:import [org.apache.storm.serialization KryoTupleSerializer]) (:import [org.apache.storm.generated StormTopology LSWorkerHeartbeat]) (:import [org.apache.storm.tuple AddressedTuple Fields]) (:import [org.apache.storm.task WorkerTopologyContext]) (:import [org.apache.storm Constants]) (:import [org.apache.storm.security.auth AuthUtils]) - (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils IStateStorage]) + (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils]) (:import [javax.security.auth Subject]) (:import [java.security PrivilegedExceptionAction]) (:import [org.apache.logging.log4j LogManager]) @@ -272,8 +273,8 @@ receive-queue-map (->> executor-receive-queue-map (mapcat (fn [[e queue]] (for [t (executor->tasks e)] [t queue]))) (into {})) - - topology (ConfigUtils/readSupervisorTopology conf storm-id) + ops (AdvancedFSOps/make conf) + topology (ConfigUtils/readSupervisorTopology conf storm-id ops) mq-context (if mq-context mq-context (TransportFactory/makeContext storm-conf))] @@ -404,7 +405,7 @@ assignment (if (= version (:version (get @(:assignment-versions worker) storm-id))) (:data (get @(:assignment-versions worker) storm-id)) (let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state storm-id callback) - new-assignment {:data (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA))) :version version}] + new-assignment {:data (clojurify-assignment (.getData thriftify-assignment-version)) :version version}] (swap! (:assignment-versions worker) assoc storm-id new-assignment) (:data new-assignment))) my-assignment (-> assignment http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index 801f9bd..0b5f9f7 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -25,7 +25,7 @@ [org.apache.storm.utils] [org.apache.storm.zookeeper Zookeeper] [org.apache.storm ProcessSimulator] - [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager] + [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils] [org.apache.storm.executor Executor] [java.util.concurrent.atomic AtomicBoolean]) (:import [java.io File]) @@ -139,8 +139,7 @@ supervisor-conf (merge (:daemon-conf cluster-map) conf {STORM-LOCAL-DIR tmp-dir - SUPERVISOR-SLOTS-PORTS port-ids - STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"}) + SUPERVISOR-SLOTS-PORTS port-ids}) id-fn (if id id (Utils/uuid)) isupervisor (proxy [StandaloneSupervisor] [] (generateSupervisorId [] id-fn)) @@ -178,18 +177,19 @@ (let [zk-tmp (local-temp-path) [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) (Zookeeper/mkInprocessZookeeper zk-tmp nil)) + nimbus-tmp (local-temp-path) daemon-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true ZMQ-LINGER-MILLIS 0 TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 STORM-CLUSTER-MODE "local" - BLOBSTORE-SUPERUSER (System/getProperty "user.name")} + BLOBSTORE-SUPERUSER (System/getProperty "user.name") + BLOBSTORE-DIR nimbus-tmp} (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) {STORM-ZOOKEEPER-PORT zk-port STORM-ZOOKEEPER-SERVERS ["localhost"]}) daemon-conf) - nimbus-tmp (local-temp-path) port-counter (mk-counter supervisor-slot-port-min) nimbus (nimbus/service-handler (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp) @@ -233,7 +233,7 @@ supervisors)] ;; tmp-dir will be taken care of by shutdown (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors)) - (.shutdown sup))) + (.close sup))) (defn kill-local-storm-cluster [cluster-map] (.shutdown (:nimbus cluster-map)) @@ -249,7 +249,7 @@ (doseq [s @(:supervisors cluster-map)] (.shutdownAllWorkers s) ;; race condition here? will it launch the workers again? - (.shutdown s)) + (.close s)) (ProcessSimulator/killAllProcesses) (if (not-nil? (:zookeeper cluster-map)) (do @@ -285,7 +285,7 @@ ([timeout-ms apredicate] (while-timeout timeout-ms (not (apredicate)) (Time/sleep 100)))) -(defn is-supervisor-waiting [^SupervisorManager supervisor] +(defn is-supervisor-waiting [^Supervisor supervisor] (.isWaiting supervisor)) (defn wait-until-cluster-waiting @@ -395,15 +395,6 @@ executor->node+port)] (submit-local-topology nimbus storm-name conf topology))))) -(defn mk-capture-launch-fn [capture-atom] - (fn [supervisorData stormId port workerId resources] - (let [conf (.getConf supervisorData) - supervisorId (.getSupervisorId supervisorData) - existing (get @capture-atom [supervisorId port] [])] - (log-message "mk-capture-launch-fn") - (ConfigUtils/setWorkerUserWSE conf workerId "") - (swap! capture-atom assoc [supervisorId port] (conj existing stormId))))) - (defn find-worker-id [supervisor-conf port] (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf) @@ -430,23 +421,6 @@ (.shutdownWorker worker-manager supervisor-id workerId worker-pids) (if (.cleanupWorker worker-manager workerId) (.remove dead-workers workerId))))) -(defmacro capture-changed-workers - [& body] - `(let [launch-captured# (atom {}) - shutdown-captured# (atom {})] - (with-var-roots [local-supervisor/launch-local-worker (mk-capture-launch-fn launch-captured#) - local-supervisor/shutdown-local-worker (mk-capture-shutdown-fn shutdown-captured#)] - ~@body - {:launched @launch-captured# - :shutdown @shutdown-captured#}))) - -(defmacro capture-launched-workers - [& body] - `(:launched (capture-changed-workers ~@body))) - -(defmacro capture-shutdown-workers - [& body] - `(:shutdown (capture-changed-workers ~@body))) (defnk aggregated-stat [cluster-map storm-name stat-key :component-ids nil] @@ -723,13 +697,18 @@ (let [target (+ amt @(:last-spout-emit tracked-topology)) track-id (-> tracked-topology :cluster ::track-id) waiting? (fn [] - (or (not= target (global-amt track-id "spout-emitted")) - (not= (global-amt track-id "transferred") - (global-amt track-id "processed"))))] + (let [spout-emitted (global-amt track-id "spout-emitted") + transferred (global-amt track-id "transferred") + processed (global-amt track-id "processed")] + (log-message "emitted " spout-emitted " target " target " transferred " transferred " processed " processed) + (or (not= target spout-emitted) + (not= transferred + processed))))] (while-timeout timeout-ms (waiting?) ;; (println "Spout emitted: " (global-amt track-id "spout-emitted")) ;; (println "Processed: " (global-amt track-id "processed")) ;; (println "Transferred: " (global-amt track-id "transferred")) + (advance-time-secs! 1) (Thread/sleep (rand-int 200))) (reset! (:last-spout-emit tracked-topology) target)))) http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 43369bd..f38ca0f 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -18,7 +18,6 @@ package org.apache.storm; import org.apache.storm.container.ResourceIsolationInterface; -import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy; import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; @@ -2264,20 +2263,14 @@ public class Config extends HashMap<String, Object> { * future releases. */ @isString - public static final Object CLIENT_JAR_TRANSFORMER = "client.jartransformer.class"; + public static final String CLIENT_JAR_TRANSFORMER = "client.jartransformer.class"; /** * The plugin to be used for resource isolation */ @isImplementationOfClass(implementsClass = ResourceIsolationInterface.class) - public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin"; - - /** - * The plugin to be used for manager worker - */ - @isImplementationOfClass(implementsClass = IWorkerManager.class) - public static final Object STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN = "storm.supervisor.worker.manager.plugin"; + public static final String STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin"; /** * CGroup Setting below @@ -2287,19 +2280,19 @@ public class Config extends HashMap<String, Object> { * root directory of the storm cgroup hierarchy */ @isString - public static final Object STORM_CGROUP_HIERARCHY_DIR = "storm.cgroup.hierarchy.dir"; + public static final String STORM_CGROUP_HIERARCHY_DIR = "storm.cgroup.hierarchy.dir"; /** * resources to to be controlled by cgroups */ @isStringList - public static final Object STORM_CGROUP_RESOURCES = "storm.cgroup.resources"; + public static final String STORM_CGROUP_RESOURCES = "storm.cgroup.resources"; /** * name for the cgroup hierarchy */ @isString - public static final Object STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name"; + public static final String STORM_CGROUP_HIERARCHY_NAME = "storm.cgroup.hierarchy.name"; /** * flag to determine whether to use a resource isolation plugin http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java index 0b6f043..aa731ff 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java @@ -41,10 +41,7 @@ import org.apache.zookeeper.data.ACL; * Never use the same paths with the *_hb* methods as you do with the others. */ public interface IStateStorage { - - public static final String DATA = "data"; - public static final String VERSION = "version"; - + /** * Registers a callback function that gets called when CuratorEvents happen. * @param callback is a clojure IFn that accepts the type - translated to @@ -157,9 +154,9 @@ public interface IStateStorage { * @param watch Whether or not to set a watch on the path. Watched paths * emit events which are consumed by functions registered with the * register method. Very useful for catching updates to nodes. - * @return An Map in the form {:data data :version version} + * @return the data with a version */ - Map get_data_with_version(String path, boolean watch); + VersionedData<byte[]> get_data_with_version(String path, boolean watch); /** * Write a worker heartbeat at the path. http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java index b016997..a6f07ed 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java @@ -29,7 +29,7 @@ public interface IStormClusterState { public Assignment assignmentInfo(String stormId, Runnable callback); - public Map assignmentInfoWithVersion(String stormId, Runnable callback); + public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback); public Integer assignmentVersion(String stormId, Runnable callback) throws Exception; http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java index c42bd38..80a398e 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java @@ -104,7 +104,7 @@ public class PaceMakerStateStorage implements IStateStorage { } @Override - public Map get_data_with_version(String path, boolean watch) { + public VersionedData<byte[]> get_data_with_version(String path, boolean watch) { return stateStorage.get_data_with_version(path, watch); } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 4f02beb..972d778 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -17,7 +17,6 @@ */ package org.apache.storm.cluster; -import clojure.lang.*; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.*; @@ -164,21 +163,18 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public Map assignmentInfoWithVersion(String stormId, Runnable callback) { - Map map = new HashMap(); + public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) { if (callback != null) { assignmentInfoWithVersionCallback.put(stormId, callback); } Assignment assignment = null; Integer version = 0; - Map dataWithVersionMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null); - if (dataWithVersionMap != null) { - assignment = ClusterUtils.maybeDeserialize((byte[]) dataWithVersionMap.get(IStateStorage.DATA), Assignment.class); - version = (Integer) dataWithVersionMap.get(IStateStorage.VERSION); - } - map.put(IStateStorage.DATA, assignment); - map.put(IStateStorage.VERSION, version); - return map; + VersionedData<byte[]> dataWithVersion = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null); + if (dataWithVersion != null) { + assignment = ClusterUtils.maybeDeserialize(dataWithVersion.getData(), Assignment.class); + version = dataWithVersion.getVersion(); + } + return new VersionedData<Assignment>(version, assignment); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java b/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java new file mode 100644 index 0000000..3de2a88 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +public class VersionedData<D> { + private final int version; + private final D data; + + public VersionedData(int version, D data) { + this.version = version; + this.data = data; + } + + public int getVersion() { + return version; + } + + public D getData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java index 87e7dfc..e337b1f 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java @@ -211,7 +211,7 @@ public class ZKStateStorage implements IStateStorage { } @Override - public Map get_data_with_version(String path, boolean watch) { + public VersionedData<byte[]> get_data_with_version(String path, boolean watch) { return Zookeeper.getDataWithVersion(zkReader, path, watch); } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java index 87a1459..c59efd7 100644 --- a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java +++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java @@ -17,39 +17,20 @@ */ package org.apache.storm.command; +import java.io.File; +import java.util.Map; + import org.apache.storm.Config; import org.apache.storm.daemon.supervisor.StandaloneSupervisor; -import org.apache.storm.daemon.supervisor.SupervisorData; -import org.apache.storm.daemon.supervisor.SupervisorUtils; -import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.daemon.supervisor.Supervisor; import org.apache.storm.utils.ConfigUtils; -import org.eclipse.jetty.util.ConcurrentHashSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Collection; -import java.util.Map; - public class KillWorkers { - private static final Logger LOG = LoggerFactory.getLogger(KillWorkers.class); - public static void main(String [] args) throws Exception { - Map conf = ConfigUtils.readStormConfig(); + Map<String, Object> conf = ConfigUtils.readStormConfig(); conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath()); - SupervisorData supervisorData = new SupervisorData(conf, null, new StandaloneSupervisor()); - IWorkerManager workerManager = supervisorData.getWorkerManager(); - Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); - String supervisorId = supervisorData.getSupervisorId(); - Map<String, String> workerToThreadPids = supervisorData.getWorkerThreadPids(); - ConcurrentHashSet deadWorkers = supervisorData.getDeadWorkers(); - for (String workerId : workerIds) { - LOG.info("Killing worker: {} through CLI.", workerId); - workerManager.shutdownWorker(supervisorId, workerId, workerToThreadPids); - if (workerManager.cleanupWorker(workerId)) { - deadWorkers.remove(workerId); - } + try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor())) { + supervisor.shutdownAllWorkers(); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java index c5cad02..7bf0249 100644 --- a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java +++ b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java @@ -18,20 +18,29 @@ package org.apache.storm.container; +import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; /** * A plugin to support resource isolation and limitation within Storm */ public interface ResourceIsolationInterface { + + /** + * Called when starting up + * @param conf the cluster config + * @throws IOException on any error. + */ + void prepare(Map<String, Object> conf) throws IOException; /** * This function should be used prior to starting the worker to reserve resources for the worker * @param workerId worker id of the worker to start * @param resources set of resources to limit */ - void reserveResourcesForWorker(String workerId, Map resources); + void reserveResourcesForWorker(String workerId, Map<String, Number> resources); /** * This function will be called when the worker needs to shutdown. This function should include logic to clean up after a worker is shutdown @@ -39,7 +48,6 @@ public interface ResourceIsolationInterface { */ void releaseResourcesForWorker(String workerId); - /** * After reserving resources for the worker (i.e. calling reserveResourcesForWorker). This function can be used * to get the modified command line to launch the worker with resource isolation @@ -56,4 +64,13 @@ public interface ResourceIsolationInterface { */ List<String> getLaunchCommandPrefix(String workerId); + /** + * Get the list of PIDs currently in an isolated container + * @param workerId the id of the worker to get these for + * @return the set of PIDs, this will be combined with + * other ways of getting PIDs. An Empty set if + * no PIDs are found. + * @throws IOException on any error + */ + Set<Long> getRunningPIDs(String workerId) throws IOException; } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java index b12fcc0..c8bb304 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java @@ -89,11 +89,11 @@ public class CgroupCommon implements CgroupCommonOperation { } @Override - public Set<Integer> getPids() throws IOException { + public Set<Long> getPids() throws IOException { List<String> stringPids = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS)); - Set<Integer> pids = new HashSet<Integer>(); + Set<Long> pids = new HashSet<>(); for (String task : stringPids) { - pids.add(Integer.valueOf(task)); + pids.add(Long.valueOf(task)); } return pids; } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java index 54368b6..eecba69 100755 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java @@ -42,7 +42,7 @@ public interface CgroupCommonOperation { /** * get the PIDs of processes running in cgroup */ - public Set<Integer> getPids() throws IOException; + public Set<Long> getPids() throws IOException; /** * to set notify_on_release config in cgroup http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java index 80093b3..9856595 100644 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java @@ -18,19 +18,11 @@ package org.apache.storm.container.cgroup; -import org.apache.commons.lang.ArrayUtils; -import org.apache.storm.Config; -import org.apache.storm.container.ResourceIsolationInterface; -import org.apache.storm.container.cgroup.core.CpuCore; -import org.apache.storm.container.cgroup.core.MemoryCore; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -38,6 +30,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.Config; +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.container.cgroup.core.CpuCore; +import org.apache.storm.container.cgroup.core.MemoryCore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Class that implements ResourceIsolationInterface that manages cgroups */ @@ -51,15 +50,15 @@ public class CgroupManager implements ResourceIsolationInterface { private CgroupCommon rootCgroup; - private static String rootDir; + private String rootDir; - private Map conf; + private Map<String, Object> conf; /** * initialize intial data structures * @param conf storm confs */ - public void prepare(Map conf) throws IOException { + public void prepare(Map<String, Object> conf) throws IOException { this.conf = conf; this.rootDir = Config.getCgroupRootDir(this.conf); if (this.rootDir == null) { @@ -81,7 +80,7 @@ public class CgroupManager implements ResourceIsolationInterface { /** * initalize subsystems */ - private void prepareSubSystem(Map conf) throws IOException { + private void prepareSubSystem(Map<String, Object> conf) throws IOException { List<SubSystemType> subSystemTypes = new LinkedList<>(); for (String resource : Config.getCgroupStormResources(conf)) { subSystemTypes.add(SubSystemType.getSubSystem(resource)); @@ -118,7 +117,7 @@ public class CgroupManager implements ResourceIsolationInterface { } } - public void reserveResourcesForWorker(String workerId, Map resourcesMap) throws SecurityException { + public void reserveResourcesForWorker(String workerId, Map<String, Number> resourcesMap) throws SecurityException { Number cpuNum = null; // The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by RAS (Resource Aware Scheduler) if (this.conf.get(Config.STORM_WORKER_CGROUP_CPU_LIMIT) != null) { @@ -211,4 +210,14 @@ public class CgroupManager implements ResourceIsolationInterface { public void close() throws IOException { this.center.deleteCgroup(this.rootCgroup); } + + @Override + public Set<Long> getRunningPIDs(String workerId) throws IOException { + CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup); + if (!this.rootCgroup.getChildren().contains(workerGroup)) { + LOG.warn("cgroup {} doesn't exist!", workerGroup); + return Collections.emptySet(); + } + return workerGroup.getPids(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java new file mode 100644 index 0000000..361328e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Writer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.PosixFilePermission; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AdvancedFSOps { + private static final Logger LOG = LoggerFactory.getLogger(AdvancedFSOps.class); + + /** + * Factory to create a new AdvancedFSOps + * @param conf the configuration of the process + * @return the appropriate instance of the class for this config and environment. + */ + public static AdvancedFSOps make(Map<String, Object> conf) { + if (Utils.isOnWindows()) { + return new AdvancedWindowsFSOps(conf); + } + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + return new AdvancedRunAsUserFSOps(conf); + } + return new AdvancedFSOps(); + } + + private static class AdvancedRunAsUserFSOps extends AdvancedFSOps { + private final Map<String, Object> _conf; + + public AdvancedRunAsUserFSOps(Map<String, Object> conf) { + if (Utils.isOnWindows()) { + throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet"); + } + _conf = conf; + } + + @Override + public void setupBlobPermissions(File path, String user) throws IOException { + String logPrefix = "setup blob permissions for " + path; + SupervisorUtils.processLauncherAndWait(_conf, user, Arrays.asList("blob", path.toString()), null, logPrefix); + } + + @Override + public void deleteIfExists(File path, String user, String logPrefix) throws IOException { + String absolutePath = path.getAbsolutePath(); + LOG.info("Deleting path {}", absolutePath); + if (user == null) { + user = Files.getOwner(path.toPath()).getName(); + } + List<String> commands = new ArrayList<>(); + commands.add("rmr"); + commands.add(absolutePath); + SupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix); + if (Utils.checkFileExists(absolutePath)) { + throw new RuntimeException(path + " was not deleted."); + } + } + + @Override + public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException { + SupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath()); + } + } + + /** + * Operations that need to override the default ones when running on Windows + * + */ + private static class AdvancedWindowsFSOps extends AdvancedFSOps { + + public AdvancedWindowsFSOps(Map<String, Object> conf) { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + throw new RuntimeException("ERROR: Windows doesn't support running workers as different users yet"); + } + } + + @Override + public void restrictDirectoryPermissions(File dir) throws IOException { + //NOOP, if windows gets support for run as user we will need to find a way to support this + } + + @Override + public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException { + // Files/move with non-empty directory doesn't work well on Windows + // This is not atomic but it does work + FileUtils.moveDirectory(fromDir, toDir); + } + + @Override + public boolean supportsAtomicDirectoryMove() { + // Files/move with non-empty directory doesn't work well on Windows + // FileUtils.moveDirectory is not atomic + return false; + } + } + + + protected AdvancedFSOps() { + //NOOP, but restricted permissions + } + + /** + * Set directory permissions to (OWNER)RWX (GROUP)R-X (OTHER)--- + * On some systems that do not support this, it may become a noop + * @param dir the directory to change permissions on + * @throws IOException on any error + */ + public void restrictDirectoryPermissions(File dir) throws IOException { + Set<PosixFilePermission> perms = new HashSet<>( + Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE, + PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ, + PosixFilePermission.GROUP_EXECUTE)); + Files.setPosixFilePermissions(dir.toPath(), perms); + } + + /** + * Move fromDir to toDir, and try to make it an atomic move if possible + * @param fromDir what to move + * @param toDir where to move it from + * @throws IOException on any error + */ + public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException { + FileUtils.forceMkdir(toDir); + Files.move(fromDir.toPath(), toDir.toPath(), StandardCopyOption.ATOMIC_MOVE); + } + + /** + * @return true if an atomic directory move works, else false. + */ + public boolean supportsAtomicDirectoryMove() { + return true; + } + + /** + * Copy a directory + * @param fromDir from where + * @param toDir to where + * @throws IOException on any error + */ + public void copyDirectory(File fromDir, File toDir) throws IOException { + FileUtils.copyDirectory(fromDir, toDir); + } + + /** + * Setup permissions properly for an internal blob store path + * @param path the path to set the permissions on + * @param user the user to change the permissions for + * @throws IOException on any error + */ + public void setupBlobPermissions(File path, String user) throws IOException { + //Normally this is a NOOP + } + + /** + * Delete a file or a directory and all of the children. If it exists. + * @param path what to delete + * @param user who to delete it as if doing it as someone else is supported + * @param logPrefix if an external process needs to be launched to delete + * the object what prefix to include in the logs + * @throws IOException on any error. + */ + public void deleteIfExists(File path, String user, String logPrefix) throws IOException { + //by default no need to do this as a different user + deleteIfExists(path); + } + + /** + * Delete a file or a directory and all of the children. If it exists. + * @param path what to delete + * @throws IOException on any error. + */ + public void deleteIfExists(File path) throws IOException { + LOG.info("Deleting path {}", path); + Path p = path.toPath(); + if (Files.exists(p)) { + try { + FileUtils.forceDelete(path); + } catch (FileNotFoundException ignored) {} + } + } + + /** + * Setup the permissions for the storm code dir + * @param topologyConf the config of the Topology + * @param path the directory to set the permissions on + * @throws IOException on any error + */ + public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException { + //By default this is a NOOP + } + + /** + * Sanity check if everything the topology needs is there for it to run. + * @param conf the config of the supervisor + * @param topologyId the ID of the topology + * @return true if everything is there, else false + * @throws IOException on any error + */ + public boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException { + return SupervisorUtils.doRequiredTopoFilesExist(conf, topologyId); + } + + /** + * Makes a directory, including any necessary but nonexistent parent + * directories. + * + * @param path the directory to create + * @throws IOException on any error + */ + public void forceMkdir(File path) throws IOException { + FileUtils.forceMkdir(path); + } + + /** + * Check if a file exists or not + * @param path the path to check + * @return true if it exists else false + * @throws IOException on any error. + */ + public boolean fileExists(File path) throws IOException { + return path.exists(); + } + + /** + * Get a writer for the given location + * @param file the file to write to + * @return the Writer to use. + * @throws IOException on any error + */ + public Writer getWriter(File file) throws IOException { + return new FileWriter(file); + } + + /** + * Get an output stream to write to a given file + * @param file the file to write to + * @return an OutputStream for that file + * @throws IOException on any error + */ + public OutputStream getOutputStream(File file) throws IOException { + return new FileOutputStream(file); + } + + /** + * Dump a string to a file + * @param location where to write to + * @param data the data to write + * @throws IOException on any error + */ + public void dump(File location, String data) throws IOException { + File parent = location.getParentFile(); + if (!parent.exists()) { + forceMkdir(parent); + } + try (Writer w = getWriter(location)) { + w.write(data); + } + } + + /** + * Read the contents of a file into a String + * @param location the file to read + * @return the contents of the file + * @throws IOException on any error + */ + public String slurpString(File location) throws IOException { + return FileUtils.readFileToString(location, "UTF-8"); + } + + /** + * Read the contents of a file into a byte array. + * @param localtion the file to read + * @return the contents of the file + * @throws IOException on any error + */ + public byte[] slurp(File location) throws IOException { + return FileUtils.readFileToByteArray(location); + } + + /** + * Create a symbolic link pointing at target + * @param link the link to create + * @param target where it should point to + * @throws IOException on any error. + */ + public void createSymlink(File link, File target) throws IOException { + Path plink = link.toPath().toAbsolutePath(); + Path ptarget = target.toPath().toAbsolutePath(); + LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget); + if (Files.exists(plink)) { + if (Files.isSameFile(plink, ptarget)) { + //It already points where we want it to + return; + } + FileUtils.forceDelete(link); + } + Files.createSymbolicLink(plink, ptarget); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java new file mode 100644 index 0000000..efaa352 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java @@ -0,0 +1,658 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +/** + * A container that runs processes on the local box. + */ +public class BasicContainer extends Container { + private static final Logger LOG = LoggerFactory.getLogger(BasicContainer.class); + private static final FilenameFilter jarFilter = (dir, name) -> name.endsWith(".jar"); + private static final Joiner CPJ = + Joiner.on(Utils.CLASS_PATH_SEPARATOR).skipNulls(); + + protected final LocalState _localState; + protected final String _profileCmd; + protected final String _stormHome = System.getProperty("storm.home"); + protected volatile boolean _exitedEarly = false; + + private class ProcessExitCallback implements ExitCodeCallback { + private final String _logPrefix; + + public ProcessExitCallback(String logPrefix) { + _logPrefix = logPrefix; + } + + @Override + public void call(int exitCode) { + LOG.info("{} exited with code: {}", _logPrefix, exitCode); + _exitedEarly = true; + } + } + + /** + * Create a new BasicContainer + * @param type the type of container being made. + * @param conf the supervisor config + * @param supervisorId the ID of the supervisor this is a part of. + * @param port the port the container is on. Should be <= 0 if only a partial recovery + * @param assignment the assignment for this container. Should be null if only a partial recovery. + * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used. + * @param localState the local state of the supervisor. May be null if partial recovery + * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. + */ + public BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port, + LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, + LocalState localState, String workerId) throws IOException { + this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null); + } + + /** + * Create a new BasicContainer + * @param type the type of container being made. + * @param conf the supervisor config + * @param supervisorId the ID of the supervisor this is a part of. + * @param port the port the container is on. Should be <= 0 if only a partial recovery + * @param assignment the assignment for this container. Should be null if only a partial recovery. + * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used. + * @param localState the local state of the supervisor. May be null if partial recovery + * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. + * @param ops file system operations (mostly for testing) if null a new one is made + * @param topoConf the config of the topology (mostly for testing) if null + * and not a partial recovery the real conf is read. + * @param profileCmd the command to use when profiling (used for testing) + * @throws IOException on any error + * @throws ContainerRecoveryException if the Container could not be recovered. + */ + BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port, + LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, + LocalState localState, String workerId, Map<String, Object> topoConf, + AdvancedFSOps ops, String profileCmd) throws IOException { + super(type, conf, supervisorId, port, assignment, resourceIsolationManager, workerId, topoConf, ops); + assert(localState != null); + _localState = localState; + + if (type.isRecovery() && !type.isOnlyKillable()) { + synchronized (localState) { + String wid = null; + Map<String, Integer> workerToPort = localState.getApprovedWorkers(); + for (Map.Entry<String, Integer> entry : workerToPort.entrySet()) { + if (port == entry.getValue().intValue()) { + wid = entry.getKey(); + } + } + if (wid == null) { + throw new ContainerRecoveryException("Could not find worker id for " + port + " " + assignment); + } + LOG.info("Recovered Worker {}", wid); + _workerId = wid; + } + } else if (_workerId == null){ + createNewWorkerId(); + } + + if (profileCmd == null) { + profileCmd = _stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + + conf.get(Config.WORKER_PROFILER_COMMAND); + } + _profileCmd = profileCmd; + } + + /** + * Create a new worker ID for this process and store in in this object and + * in the local state. Never call this if a worker is currently up and running. + * We will lose track of the process. + */ + protected void createNewWorkerId() { + _type.assertFull(); + assert(_workerId == null); + synchronized (_localState) { + _workerId = Utils.uuid(); + Map<String, Integer> workerToPort = _localState.getApprovedWorkers(); + if (workerToPort == null) { + workerToPort = new HashMap<>(1); + } + removeWorkersOn(workerToPort, _port); + workerToPort.put(_workerId, _port); + _localState.setApprovedWorkers(workerToPort); + LOG.info("Created Worker ID {}", _workerId); + } + } + + private static void removeWorkersOn(Map<String, Integer> workerToPort, int _port) { + for (Iterator<Entry<String, Integer>> i = workerToPort.entrySet().iterator(); i.hasNext();) { + Entry<String, Integer> found = i.next(); + if (_port == found.getValue().intValue()) { + LOG.warn("Deleting worker {} from state", found.getKey()); + i.remove(); + } + } + } + + @Override + public void cleanUpForRestart() throws IOException { + String origWorkerId = _workerId; + super.cleanUpForRestart(); + synchronized (_localState) { + Map<String, Integer> workersToPort = _localState.getApprovedWorkers(); + workersToPort.remove(origWorkerId); + removeWorkersOn(workersToPort, _port); + _localState.setApprovedWorkers(workersToPort); + LOG.info("Removed Worker ID {}", origWorkerId); + } + } + + @Override + public void relaunch() throws IOException { + _type.assertFull(); + //We are launching it now... + _type = ContainerType.LAUNCH; + createNewWorkerId(); + setup(); + launch(); + } + + @Override + public boolean didMainProcessExit() { + return _exitedEarly; + } + + /** + * Run the given command for profiling + * + * @param command + * the command to run + * @param env + * the environment to run the command + * @param logPrefix + * the prefix to include in the logs + * @param targetDir + * the working directory to run the command in + * @return true if it ran successfully, else false + * @throws IOException + * on any error + * @throws InterruptedException + * if interrupted wile waiting for the process to exit. + */ + protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, + File targetDir) throws IOException, InterruptedException { + _type.assertFull(); + Process p = SupervisorUtils.launchProcess(command, env, logPrefix, null, targetDir); + int ret = p.waitFor(); + return ret == 0; + } + + @Override + public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException { + _type.assertFull(); + String targetDir = ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port); + + @SuppressWarnings("unchecked") + Map<String, String> env = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT); + if (env == null) { + env = new HashMap<String, String>(); + } + + String str = ConfigUtils.workerArtifactsPidPath(_conf, _topologyId, _port); + + String workerPid = _ops.slurpString(new File(str)).trim(); + + ProfileAction profileAction = request.get_action(); + String logPrefix = "ProfilerAction process " + _topologyId + ":" + _port + " PROFILER_ACTION: " + profileAction + + " "; + + List<String> command = mkProfileCommand(profileAction, stop, workerPid, targetDir); + + File targetFile = new File(targetDir); + if (command.size() > 0) { + return runProfilingCommand(command, env, logPrefix, targetFile); + } + LOG.warn("PROFILING REQUEST NOT SUPPORTED {} IGNORED...", request); + return true; + } + + /** + * Get the command to run when doing profiling + * @param action the profiling action to perform + * @param stop if this is meant to stop the profiling or start it + * @param workerPid the PID of the process to profile + * @param targetDir the current working directory of the worker process + * @return the command to run for profiling. + */ + private List<String> mkProfileCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { + switch(action) { + case JMAP_DUMP: + return jmapDumpCmd(workerPid, targetDir); + case JSTACK_DUMP: + return jstackDumpCmd(workerPid, targetDir); + case JPROFILE_DUMP: + return jprofileDump(workerPid, targetDir); + case JVM_RESTART: + return jprofileJvmRestart(workerPid); + case JPROFILE_STOP: + if (stop) { + return jprofileStop(workerPid, targetDir); + } + return jprofileStart(workerPid); + default: + return Lists.newArrayList(); + } + } + + private List<String> jmapDumpCmd(String pid, String targetDir) { + return Lists.newArrayList(_profileCmd, pid, "jmap", targetDir); + } + + private List<String> jstackDumpCmd(String pid, String targetDir) { + return Lists.newArrayList(_profileCmd, pid, "jstack", targetDir); + } + + private List<String> jprofileStart(String pid) { + return Lists.newArrayList(_profileCmd, pid, "start"); + } + + private List<String> jprofileStop(String pid, String targetDir) { + return Lists.newArrayList(_profileCmd, pid, "stop", targetDir); + } + + private List<String> jprofileDump(String pid, String targetDir) { + return Lists.newArrayList(_profileCmd, pid, "dump", targetDir); + } + + private List<String> jprofileJvmRestart(String pid) { + return Lists.newArrayList(_profileCmd, pid, "kill"); + } + + /** + * Compute the java.library.path that should be used for the worker. + * This helps it to load JNI libraries that are packaged in the uber jar. + * @param stormRoot the root directory of the worker process + * @param conf the config for the supervisor. + * @return the java.library.path/LD_LIBRARY_PATH to use so native libraries load correctly. + */ + protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) { + String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; + String os = System.getProperty("os.name").replaceAll("\\s+", "_"); + String arch = System.getProperty("os.arch"); + String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch; + String ret = CPJ.join(archResourceRoot, resourceRoot, + conf.get(Config.JAVA_LIBRARY_PATH)); + return ret; + } + + /** + * Returns a collection of jar file names found under the given directory. + * @param dir the directory to search + * @return the jar file names + */ + protected List<String> getFullJars(File dir) { + File[] files = dir.listFiles(jarFilter); + + if (files == null) { + return Collections.emptyList(); + } + + return Arrays.stream(files).map(f -> f.getAbsolutePath()) + .collect(Collectors.toList()); + } + + protected List<String> frameworkClasspath() { + File stormLibDir = new File(_stormHome, "lib"); + String stormConfDir = + System.getenv("STORM_CONF_DIR") != null ? + System.getenv("STORM_CONF_DIR") : + new File(_stormHome, "conf").getAbsolutePath(); + File stormExtlibDir = new File(_stormHome, "extlib"); + String extcp = System.getenv("STORM_EXT_CLASSPATH"); + List<String> pathElements = new LinkedList<>(); + pathElements.addAll(getFullJars(stormLibDir)); + pathElements.addAll(getFullJars(stormExtlibDir)); + pathElements.add(extcp); + pathElements.add(stormConfDir); + + return pathElements; + } + + @SuppressWarnings("unchecked") + private List<String> asStringList(Object o) { + if (o instanceof String) { + return Arrays.asList((String)o); + } else if (o instanceof List) { + return (List<String>)o; + } + return Collections.EMPTY_LIST; + } + + /** + * Compute the classpath for the worker process + * @param stormJar the topology jar + * @param dependencyLocations any dependencies from the topology + * @return the full classpath + */ + protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations) { + List<String> workercp = new ArrayList<>(); + workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH_BEGINNING))); + workercp.addAll(frameworkClasspath()); + workercp.add(stormJar); + workercp.addAll(dependencyLocations); + workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH))); + return CPJ.join(workercp); + } + + private String substituteChildOptsInternal(String string, int memOnheap) { + if (StringUtils.isNotBlank(string)) { + String p = String.valueOf(_port); + string = string.replace("%ID%", p); + string = string.replace("%WORKER-ID%", _workerId); + string = string.replace("%TOPOLOGY-ID%", _topologyId); + string = string.replace("%WORKER-PORT%", p); + if (memOnheap > 0) { + string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + } + } + return string; + } + + protected List<String> substituteChildopts(Object value) { + return substituteChildopts(value, -1); + } + + protected List<String> substituteChildopts(Object value, int memOnheap) { + List<String> rets = new ArrayList<>(); + if (value instanceof String) { + String string = substituteChildOptsInternal((String) value, memOnheap); + if (StringUtils.isNotBlank(string)) { + String[] strings = string.split("\\s+"); + rets.addAll(Arrays.asList(strings)); + } + } else if (value instanceof List) { + @SuppressWarnings("unchecked") + List<String> objects = (List<String>) value; + for (String object : objects) { + String str = substituteChildOptsInternal(object, memOnheap); + if (StringUtils.isNotBlank(str)) { + rets.add(str); + } + } + } + return rets; + } + + /** + * Launch the worker process (non-blocking) + * + * @param command + * the command to run + * @param env + * the environment to run the command + * @param processExitcallback + * a callback for when the process exits + * @param logPrefix + * the prefix to include in the logs + * @param targetDir + * the working directory to run the command in + * @return true if it ran successfully, else false + * @throws IOException + * on any error + */ + protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix, + ExitCodeCallback processExitCallback, File targetDir) throws IOException { + if (_resourceIsolationManager != null) { + command = _resourceIsolationManager.getLaunchCommand(_workerId, command); + } + SupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir); + } + + private String getWorkerLoggingConfigFile() { + String log4jConfigurationDir = (String) (_conf.get(Config.STORM_LOG4J2_CONF_DIR)); + + if (StringUtils.isNotBlank(log4jConfigurationDir)) { + if (!Utils.isAbsolutePath(log4jConfigurationDir)) { + log4jConfigurationDir = _stormHome + Utils.FILE_PATH_SEPARATOR + log4jConfigurationDir; + } + } else { + log4jConfigurationDir = _stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; + } + + if (Utils.IS_ON_WINDOWS && !log4jConfigurationDir.startsWith("file:")) { + log4jConfigurationDir = "file:///" + log4jConfigurationDir; + } + return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml"; + } + + /** + * Get parameters for the class path of the worker process. Also used by the + * log Writer + * @param stormRoot the root dist dir for the topology + * @return the classpath for the topology as command line arguments. + * @throws IOException on any error. + */ + private List<String> getClassPathParams(final String stormRoot) throws IOException { + final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops); + final List<String> dependencyLocations = new ArrayList<>(); + if (stormTopology.get_dependency_jars() != null) { + for (String dependency : stormTopology.get_dependency_jars()) { + dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath()); + } + } + + if (stormTopology.get_dependency_artifacts() != null) { + for (String dependency : stormTopology.get_dependency_artifacts()) { + dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath()); + } + } + final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations); + + List<String> classPathParams = new ArrayList<>(); + classPathParams.add("-cp"); + classPathParams.add(workerClassPath); + return classPathParams; + } + + /** + * Get a set of java properties that are common to both the log writer and the worker processes. + * These are mostly system properties that are used by logging. + * @return a list of command line options + */ + private List<String> getCommonParams() { + final String workersArtifacts = ConfigUtils.workerArtifactsRoot(_conf); + String stormLogDir = ConfigUtils.getLogDir(); + String log4jConfigurationFile = getWorkerLoggingConfigFile(); + + List<String> commonParams = new ArrayList<>(); + commonParams.add("-Dlogging.sensitivity=" + OR((String) _topoConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY), "S3")); + commonParams.add("-Dlogfile.name=worker.log"); + commonParams.add("-Dstorm.home=" + OR(_stormHome, "")); + commonParams.add("-Dworkers.artifacts=" + workersArtifacts); + commonParams.add("-Dstorm.id=" + _topologyId); + commonParams.add("-Dworker.id=" + _workerId); + commonParams.add("-Dworker.port=" + _port); + commonParams.add("-Dstorm.log.dir=" + stormLogDir); + commonParams.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commonParams.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commonParams.add("-Dstorm.local.dir=" + _conf.get(Config.STORM_LOCAL_DIR)); + return commonParams; + } + + private int getMemOnHeap(WorkerResources resources) { + int memOnheap = 0; + if (resources != null && resources.is_set_mem_on_heap() && + resources.get_mem_on_heap() > 0) { + memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); + } else { + // set the default heap memory size for supervisor-test + memOnheap = Utils.getInt(_topoConf.get(Config.WORKER_HEAP_MEMORY_MB), 768); + } + return memOnheap; + } + + private List<String> getWorkerProfilerChildOpts(int memOnheap) { + List<String> workerProfilerChildopts = new ArrayList<>(); + if (Utils.getBoolean(_conf.get(Config.WORKER_PROFILER_ENABLED), false)) { + workerProfilerChildopts = substituteChildopts(_conf.get(Config.WORKER_PROFILER_CHILDOPTS), memOnheap); + } + return workerProfilerChildopts; + } + + /** + * a or b the first one that is not null + * @param a something + * @param b something else + * @return a or b the first one that is not null + */ + private <V> V OR(V a, V b) { + return a == null ? b : a; + } + + protected String javaCmd(String cmd) { + String ret = null; + String javaHome = System.getenv().get("JAVA_HOME"); + if (StringUtils.isNotBlank(javaHome)) { + ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd; + } else { + ret = cmd; + } + return ret; + } + + /** + * Create the command to launch the worker process + * @param memOnheap the on heap memory for the worker + * @param stormRoot the root dist dir for the topology + * @param jlp java library path for the topology + * @return the command to run + * @throws IOException on any error. + */ + private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot, + final String jlp) throws IOException { + final String javaCmd = javaCmd("java"); + final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); + final String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); + final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId); + + List<String> classPathParams = getClassPathParams(stormRoot); + List<String> commonParams = getCommonParams(); + + List<String> commandList = new ArrayList<>(); + //Log Writer Command... + commandList.add(javaCmd); + commandList.addAll(classPathParams); + commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS))); + commandList.addAll(commonParams); + commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker. + + //Worker Command... + commandList.add(javaCmd); + commandList.add("-server"); + commandList.addAll(commonParams); + commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap)); + commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap)); + commandList.addAll(substituteChildopts(OR( + _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), + _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap)); + commandList.addAll(getWorkerProfilerChildOpts(memOnheap)); + commandList.add("-Djava.library.path=" + jlp); + commandList.add("-Dstorm.conf.file=" + stormConfFile); + commandList.add("-Dstorm.options=" + stormOptions); + commandList.add("-Djava.io.tmpdir=" + workerTmpDir); + commandList.addAll(classPathParams); + commandList.add("org.apache.storm.daemon.worker"); + commandList.add(_topologyId); + commandList.add(_supervisorId); + commandList.add(String.valueOf(_port)); + commandList.add(_workerId); + + return commandList; + } + + @Override + public void launch() throws IOException { + _type.assertFull(); + LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment, + _supervisorId, _port, _workerId); + String logPrefix = "Worker Process " + _workerId; + ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix); + _exitedEarly = false; + + final WorkerResources resources = _assignment.get_resources(); + final int memOnheap = getMemOnHeap(resources); + final String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId); + final String jlp = javaLibraryPath(stormRoot, _conf); + + List<String> commandList = mkLaunchCommand(memOnheap, stormRoot, jlp); + + Map<String, String> topEnvironment = new HashMap<String, String>(); + @SuppressWarnings("unchecked") + Map<String, String> environment = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT); + if (environment != null) { + topEnvironment.putAll(environment); + } + topEnvironment.put("LD_LIBRARY_PATH", jlp); + + if (_resourceIsolationManager != null) { + int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); + int cpu = (int) Math.ceil(resources.get_cpu()); + + int cGroupMem = (int) (Math.ceil((double) _conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB))); + int memoryValue = memoffheap + memOnheap + cGroupMem; + int cpuValue = cpu; + Map<String, Number> map = new HashMap<>(); + map.put("cpu", cpuValue); + map.put("memory", memoryValue); + _resourceIsolationManager.reserveResourcesForWorker(_workerId, map); + } + + LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList)); + + String workerDir = ConfigUtils.workerRoot(_conf, _workerId); + + launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java new file mode 100644 index 0000000..4915650 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.IOException; +import java.util.Map; + +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.daemon.supervisor.Container.ContainerType; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.utils.LocalState; + +/** + * Launch containers with no security using standard java commands + */ +public class BasicContainerLauncher extends ContainerLauncher { + private final Map<String, Object> _conf; + private final String _supervisorId; + protected final ResourceIsolationInterface _resourceIsolationManager; + + public BasicContainerLauncher(Map<String, Object> conf, String supervisorId, ResourceIsolationInterface resourceIsolationManager) throws IOException { + _conf = conf; + _supervisorId = supervisorId; + _resourceIsolationManager = resourceIsolationManager; + } + + @Override + public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { + Container container = new BasicContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment, + _resourceIsolationManager, state, null); + container.setup(); + container.launch(); + return container; + } + + @Override + public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { + return new BasicContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment, + _resourceIsolationManager, state, null); + } + + @Override + public Killable recoverContainer(String workerId, LocalState localState) throws IOException { + return new BasicContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null, + _resourceIsolationManager, localState, workerId); + } +}