http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj deleted file mode 100644 index 1326672..0000000 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ /dev/null @@ -1,768 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.daemon.worker - (:use [org.apache.storm.daemon common]) - (:use [org.apache.storm config log util timer local-state]) - (:require [clj-time.core :as time]) - (:require [clj-time.coerce :as coerce]) - (:require [org.apache.storm.daemon [executor :as executor]]) - (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]]) - (:require [clojure.set :as set]) - (:require [org.apache.storm.messaging.loader :as msg-loader]) - (:import [java.util.concurrent Executors] - [org.apache.storm.hooks IWorkerHook BaseWorkerHook]) - (:import [java.util ArrayList HashMap]) - (:import [org.apache.storm.utils Utils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue]) - (:import [org.apache.storm.grouping LoadMapping]) - (:import [org.apache.storm.messaging TransportFactory]) - (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) - (:import [org.apache.storm.daemon Shutdownable]) - (:import [org.apache.storm.serialization KryoTupleSerializer]) - (:import [org.apache.storm.generated StormTopology]) - (:import [org.apache.storm.tuple AddressedTuple Fields]) - (:import [org.apache.storm.task WorkerTopologyContext]) - (:import [org.apache.storm Constants]) - (:import [org.apache.storm.security.auth AuthUtils]) - (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) - (:import [javax.security.auth Subject]) - (:import [java.security PrivilegedExceptionAction]) - (:import [org.apache.logging.log4j LogManager]) - (:import [org.apache.logging.log4j Level]) - (:import [org.apache.logging.log4j.core.config LoggerConfig]) - (:import [org.apache.storm.generated LogConfig LogLevelAction]) - (:gen-class)) - -(defmulti mk-suicide-fn cluster-mode) - -(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions] - (log-message "Reading Assignments.") - (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))] - (doall - (concat - [Constants/SYSTEM_EXECUTOR_ID] - (mapcat (fn [[executor loc]] - (if (= loc [assignment-id port]) - [executor] - )) - assignment))))) - -(defnk do-executor-heartbeats [worker :executors nil] - ;; stats is how we know what executors are assigned to this worker - (let [stats (if-not executors - (into {} (map (fn [e] {e nil}) (:executors worker))) - (->> executors - (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)})) - (apply merge))) - zk-hb {:storm-id (:storm-id worker) - :executor-stats stats - :uptime ((:uptime worker)) - :time-secs (current-time-secs) - }] - ;; do the zookeeper heartbeat - (try - (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb) - (catch Exception exc - (log-error exc "Worker failed to write heartbeats to ZK or Pacemaker...will retry"))))) - -(defn do-heartbeat [worker] - (let [conf (:conf worker) - state (worker-state conf (:worker-id worker))] - ;; do the local-file-system heartbeat. - (ls-worker-heartbeat! state (current-time-secs) (:storm-id worker) (:executors worker) (:port worker)) - (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up. - ; it shouldn't take supervisor 120 seconds between listing dir and reading it - - )) - -(defn worker-outbound-tasks - "Returns seq of task-ids that receive messages from this worker" - [worker] - (let [context (worker-context worker) - components (mapcat - (fn [task-id] - (->> (.getComponentId context (int task-id)) - (.getTargets context) - vals - (map keys) - (apply concat))) - (:task-ids worker))] - (-> worker - :task->component - reverse-map - (select-keys components) - vals - flatten - set ))) - -(defn get-dest - [^AddressedTuple addressed-tuple] - "get the destination for an AddressedTuple" - (.getDest addressed-tuple)) - -(defn mk-transfer-local-fn [worker] - (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker) - task->short-executor (:task->short-executor worker) - task-getter (comp #(get task->short-executor %) get-dest)] - (fn [tuple-batch] - (let [grouped (fast-group-by task-getter tuple-batch)] - (fast-map-iter [[short-executor pairs] grouped] - (let [q (short-executor-receive-queue-map short-executor)] - (if q - (disruptor/publish q pairs) - (log-warn "Received invalid messages for unknown tasks. Dropping... ") - ))))))) - -(defn- assert-can-serialize [^KryoTupleSerializer serializer tuple-batch] - "Check that all of the tuples can be serialized by serializing them." - (fast-list-iter [[task tuple :as pair] tuple-batch] - (.serialize serializer tuple))) - -(defn- mk-backpressure-handler [executors] - "make a handler that checks and updates worker's backpressure flag" - (disruptor/worker-backpressure-handler - (fn [worker] - (let [storm-id (:storm-id worker) - assignment-id (:assignment-id worker) - port (:port worker) - storm-cluster-state (:storm-cluster-state worker) - prev-backpressure-flag @(:backpressure worker) - ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on - curr-backpressure-flag (if executors - (or (.getThrottleOn (:transfer-queue worker)) - (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))) - prev-backpressure-flag)] - ;; update the worker's backpressure flag to zookeeper only when it has changed - (when (not= prev-backpressure-flag curr-backpressure-flag) - (try - (log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag) - (.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-flag) - ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception - (reset! (:backpressure worker) curr-backpressure-flag) - (catch Exception exc - (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry")))) - )))) - -(defn- mk-disruptor-backpressure-handler [worker] - "make a handler for the worker's send disruptor queue to - check highWaterMark and lowWaterMark for backpressure" - (disruptor/disruptor-backpressure-handler - (fn [] - (log-debug "worker " (:worker-id worker) " transfer-queue is congested, set backpressure flag true") - (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker))) - (fn [] - (log-debug "worker " (:worker-id worker) " transfer-queue is not congested, set backpressure flag false") - (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker))))) - -(defn mk-transfer-fn [worker] - (let [local-tasks (-> worker :task-ids set) - local-transfer (:transfer-local-fn worker) - ^DisruptorQueue transfer-queue (:transfer-queue worker) - task->node+port (:cached-task->node+port worker) - try-serialize-local ((:storm-conf worker) TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE) - - transfer-fn - (fn [^KryoTupleSerializer serializer tuple-batch] - (let [^ArrayList local (ArrayList.) - ^HashMap remoteMap (HashMap.)] - (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch] - (let [task (.getDest addressed-tuple) - tuple (.getTuple addressed-tuple)] - (if (local-tasks task) - (.add local addressed-tuple) - - ;;Using java objects directly to avoid performance issues in java code - (do - (when (not (.get remoteMap task)) - (.put remoteMap task (ArrayList.))) - (let [^ArrayList remote (.get remoteMap task)] - (if (not-nil? task) - (.add remote (TaskMessage. task ^bytes (.serialize serializer tuple))) - (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple))) - ))))) - - (when (not (.isEmpty local)) (local-transfer local)) - (when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))] - (if try-serialize-local - (do - (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)") - (fn [^KryoTupleSerializer serializer tuple-batch] - (assert-can-serialize serializer tuple-batch) - (transfer-fn serializer tuple-batch))) - transfer-fn))) - -(defn- mk-receive-queue-map [storm-conf executors] - (->> executors - ;; TODO: this depends on the type of executor - (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e) - (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) - (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))])) - (into {}) - )) - -(defn- stream->fields [^StormTopology topology component] - (->> (ThriftTopologyUtils/getComponentCommon topology component) - .get_streams - (map (fn [[s info]] [s (Fields. (.get_output_fields info))])) - (into {}) - (HashMap.))) - -(defn component->stream->fields [^StormTopology topology] - (->> (ThriftTopologyUtils/getComponentIds topology) - (map (fn [c] [c (stream->fields topology c)])) - (into {}) - (HashMap.))) - -(defn- mk-default-resources [worker] - (let [conf (:conf worker) - thread-pool-size (int (conf TOPOLOGY-WORKER-SHARED-THREAD-POOL-SIZE))] - {WorkerTopologyContext/SHARED_EXECUTOR (Executors/newFixedThreadPool thread-pool-size)} - )) - -(defn- mk-user-resources [worker] - ;;TODO: need to invoke a hook provided by the topology, giving it a chance to create user resources. - ;; this would be part of the initialization hook - ;; need to separate workertopologycontext into WorkerContext and WorkerUserContext. - ;; actually just do it via interfaces. just need to make sure to hide setResource from tasks - {}) - -(defn mk-halting-timer [timer-name] - (mk-timer :kill-fn (fn [t] - (log-error t "Error when processing event") - (exit-process! 20 "Error when processing an event") - ) - :timer-name timer-name)) - -(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state] - (let [assignment-versions (atom {}) - executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) - transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) - (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) - executor-receive-queue-map (mk-receive-queue-map storm-conf executors) - - receive-queue-map (->> executor-receive-queue-map - (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue]))) - (into {})) - - topology (read-supervisor-topology conf storm-id) - mq-context (if mq-context - mq-context - (TransportFactory/makeContext storm-conf))] - - (recursive-map - :conf conf - :mq-context mq-context - :receiver (.bind ^IContext mq-context storm-id port) - :storm-id storm-id - :assignment-id assignment-id - :port port - :worker-id worker-id - :cluster-state cluster-state - :storm-cluster-state storm-cluster-state - ;; when worker bootup, worker will start to setup initial connections to - ;; other workers. When all connection is ready, we will enable this flag - ;; and spout and bolt will be activated. - :worker-active-flag (atom false) - :storm-active-atom (atom false) - :storm-component->debug-atom (atom {}) - :executors executors - :task-ids (->> receive-queue-map keys (map int) sort) - :storm-conf storm-conf - :topology topology - :system-topology (system-topology! storm-conf topology) - :heartbeat-timer (mk-halting-timer "heartbeat-timer") - :refresh-load-timer (mk-halting-timer "refresh-load-timer") - :refresh-connections-timer (mk-halting-timer "refresh-connections-timer") - :refresh-credentials-timer (mk-halting-timer "refresh-credentials-timer") - :reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer") - :refresh-active-timer (mk-halting-timer "refresh-active-timer") - :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer") - :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer") - :user-timer (mk-halting-timer "user-timer") - :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on - :component->stream->fields (component->stream->fields (:system-topology <>)) - :component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort)) - :endpoint-socket-lock (mk-rw-lock) - :cached-node+port->socket (atom {}) - :cached-task->node+port (atom {}) - :transfer-queue transfer-queue - :executor-receive-queue-map executor-receive-queue-map - :short-executor-receive-queue-map (map-key first executor-receive-queue-map) - :task->short-executor (->> executors - (mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)]))) - (into {}) - (HashMap.)) - :suicide-fn (mk-suicide-fn conf) - :uptime (uptime-computer) - :default-shared-resources (mk-default-resources <>) - :user-shared-resources (mk-user-resources <>) - :transfer-local-fn (mk-transfer-local-fn <>) - :transfer-fn (mk-transfer-fn <>) - :load-mapping (LoadMapping.) - :assignment-versions assignment-versions - :backpressure (atom false) ;; whether this worker is going slow - :backpressure-trigger (Object.) ;; a trigger for synchronization with executors - :throttle-on (atom false) ;; whether throttle is activated for spouts - ))) - -(defn- endpoint->string [[node port]] - (str port "/" node)) - -(defn string->endpoint [^String s] - (let [[port-str node] (.split s "/" 2)] - [node (Integer/valueOf port-str)] - )) - -(def LOAD-REFRESH-INTERVAL-MS 5000) - -(defn mk-refresh-load [worker] - (let [local-tasks (set (:task-ids worker)) - remote-tasks (set/difference (worker-outbound-tasks worker) local-tasks) - short-executor-receive-queue-map (:short-executor-receive-queue-map worker) - next-update (atom 0)] - (fn this - ([] - (let [^LoadMapping load-mapping (:load-mapping worker) - local-pop (map-val (fn [queue] - (let [q-metrics (.getMetrics queue)] - (/ (double (.population q-metrics)) (.capacity q-metrics)))) - short-executor-receive-queue-map) - remote-load (reduce merge (for [[np conn] @(:cached-node+port->socket worker)] (into {} (.getLoad conn remote-tasks)))) - now (System/currentTimeMillis)] - (.setLocal load-mapping local-pop) - (.setRemote load-mapping remote-load) - (when (> now @next-update) - (.sendLoadMetrics (:receiver worker) local-pop) - (reset! next-update (+ LOAD-REFRESH-INTERVAL-MS now)))))))) - -(defn mk-refresh-connections [worker] - (let [outbound-tasks (worker-outbound-tasks worker) - conf (:conf worker) - storm-cluster-state (:storm-cluster-state worker) - storm-id (:storm-id worker)] - (fn this - ([] - (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) - ([callback] - (let [version (.assignment-version storm-cluster-state storm-id callback) - assignment (if (= version (:version (get @(:assignment-versions worker) storm-id))) - (:data (get @(:assignment-versions worker) storm-id)) - (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)] - (swap! (:assignment-versions worker) assoc storm-id new-assignment) - (:data new-assignment))) - my-assignment (-> assignment - :executor->node+port - to-task->node+port - (select-keys outbound-tasks) - (#(map-val endpoint->string %))) - ;; we dont need a connection for the local tasks anymore - needed-assignment (->> my-assignment - (filter-key (complement (-> worker :task-ids set)))) - needed-connections (-> needed-assignment vals set) - needed-tasks (-> needed-assignment keys) - - current-connections (set (keys @(:cached-node+port->socket worker))) - new-connections (set/difference needed-connections current-connections) - remove-connections (set/difference current-connections needed-connections)] - (swap! (:cached-node+port->socket worker) - #(HashMap. (merge (into {} %1) %2)) - (into {} - (dofor [endpoint-str new-connections - :let [[node port] (string->endpoint endpoint-str)]] - [endpoint-str - (.connect - ^IContext (:mq-context worker) - storm-id - ((:node->host assignment) node) - port) - ] - ))) - (write-locked (:endpoint-socket-lock worker) - (reset! (:cached-task->node+port worker) - (HashMap. my-assignment))) - (doseq [endpoint remove-connections] - (.close (get @(:cached-node+port->socket worker) endpoint))) - (apply swap! - (:cached-node+port->socket worker) - #(HashMap. (apply dissoc (into {} %1) %&)) - remove-connections) - - ))))) - -(defn refresh-storm-active - ([worker] - (refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))))) - ([worker callback] - (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)] - (reset! - (:storm-active-atom worker) - (and (= :active (-> base :status :type)) @(:worker-active-flag worker))) - (reset! (:storm-component->debug-atom worker) (-> base :component->debug)) - (log-debug "Event debug options " @(:storm-component->debug-atom worker))))) - -;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues -(defn mk-transfer-tuples-handler [worker] - (let [^DisruptorQueue transfer-queue (:transfer-queue worker) - drainer (TransferDrainer.) - node+port->socket (:cached-node+port->socket worker) - task->node+port (:cached-task->node+port worker) - endpoint-socket-lock (:endpoint-socket-lock worker) - ] - (disruptor/clojure-handler - (fn [packets _ batch-end?] - (.add drainer packets) - - (when batch-end? - (read-locked endpoint-socket-lock - (let [node+port->socket @node+port->socket - task->node+port @task->node+port] - (.send drainer task->node+port node+port->socket))) - (.clear drainer)))))) - -;; Check whether this messaging connection is ready to send data -(defn is-connection-ready [^IConnection connection] - (if (instance? ConnectionWithStatus connection) - (let [^ConnectionWithStatus connection connection - status (.status connection)] - (= status ConnectionWithStatus$Status/Ready)) - true)) - -;; all connections are ready -(defn all-connections-ready [worker] - (let [connections (vals @(:cached-node+port->socket worker))] - (every? is-connection-ready connections))) - -;; we will wait all connections to be ready and then activate the spout/bolt -;; when the worker bootup -(defn activate-worker-when-all-connections-ready - [worker] - (let [timer (:refresh-active-timer worker) - delay-secs 0 - recur-secs 1] - (schedule timer - delay-secs - (fn this [] - (if (all-connections-ready worker) - (do - (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker) - " with id "(:worker-id worker)) - (reset! (:worker-active-flag worker) true)) - (schedule timer recur-secs this :check-active false) - ))))) - -(defn register-callbacks [worker] - (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker)) - (msg-loader/register-callback (:transfer-local-fn worker) - (:receiver worker) - (:storm-conf worker) - (worker-context worker))) - -(defn- close-resources [worker] - (let [dr (:default-shared-resources worker)] - (log-message "Shutting down default resources") - (.shutdownNow (get dr WorkerTopologyContext/SHARED_EXECUTOR)) - (log-message "Shut down default resources"))) - -(defn- get-logger-levels [] - (into {} - (let [logger-config (.getConfiguration (LogManager/getContext false))] - (for [[logger-name logger] (.getLoggers logger-config)] - {logger-name (.getLevel logger)})))) - -(defn set-logger-level [logger-context logger-name new-level] - (let [config (.getConfiguration logger-context) - logger-config (.getLoggerConfig config logger-name)] - (if (not (= (.getName logger-config) logger-name)) - ;; create a new config. Make it additive (true) s.t. inherit - ;; parents appenders - (let [new-logger-config (LoggerConfig. logger-name new-level true)] - (log-message "Adding config for: " new-logger-config " with level: " new-level) - (.addLogger config logger-name new-logger-config)) - (do - (log-message "Setting " logger-config " log level to: " new-level) - (.setLevel logger-config new-level))))) - -;; function called on timer to reset log levels last set to DEBUG -;; also called from process-log-config-change -(defn reset-log-levels [latest-log-config-atom] - (let [latest-log-config @latest-log-config-atom - logger-context (LogManager/getContext false)] - (doseq [[logger-name logger-setting] (sort latest-log-config)] - (let [timeout (:timeout logger-setting) - target-log-level (:target-log-level logger-setting) - reset-log-level (:reset-log-level logger-setting)] - (when (> (coerce/to-long (time/now)) timeout) - (log-message logger-name ": Resetting level to " reset-log-level) - (set-logger-level logger-context logger-name reset-log-level) - (swap! latest-log-config-atom - (fn [prev] - (dissoc prev logger-name)))))) - (.updateLoggers logger-context))) - -;; when a new log level is received from zookeeper, this function is called -(defn process-log-config-change [latest-log-config original-log-levels log-config] - (when log-config - (log-debug "Processing received log config: " log-config) - ;; merge log configs together - (let [loggers (.get_named_logger_level log-config) - logger-context (LogManager/getContext false)] - (def new-log-configs - (into {} - ;; merge named log levels - (for [[msg-logger-name logger-level] loggers] - (let [logger-name (if (= msg-logger-name "ROOT") - LogManager/ROOT_LOGGER_NAME - msg-logger-name)] - ;; the new-timeouts map now contains logger => timeout - (when (.is_set_reset_log_level_timeout_epoch logger-level) - {logger-name {:action (.get_action logger-level) - :target-log-level (Level/toLevel (.get_target_log_level logger-level)) - :reset-log-level (or (.get @original-log-levels logger-name) (Level/INFO)) - :timeout (.get_reset_log_level_timeout_epoch logger-level)}}))))) - - ;; look for deleted log timeouts - (doseq [[logger-name logger-val] (sort @latest-log-config)] - (when (not (contains? new-log-configs logger-name)) - ;; if we had a timeout, but the timeout is no longer active - (set-logger-level - logger-context logger-name (:reset-log-level logger-val)))) - - ;; apply new log settings we just received - ;; the merged configs are only for the reset logic - (doseq [[msg-logger-name logger-level] (sort (into {} (.get_named_logger_level log-config)))] - (let [logger-name (if (= msg-logger-name "ROOT") - LogManager/ROOT_LOGGER_NAME - msg-logger-name) - level (Level/toLevel (.get_target_log_level logger-level)) - action (.get_action logger-level)] - (if (= action LogLevelAction/UPDATE) - (set-logger-level logger-context logger-name level)))) - - (.updateLoggers logger-context) - (reset! latest-log-config new-log-configs) - (log-debug "New merged log config is " @latest-log-config)))) - -(defn run-worker-start-hooks [worker] - (let [topology (:topology worker) - topo-conf (:storm-conf worker) - worker-topology-context (worker-context worker) - hooks (.get_worker_hooks topology)] - (dofor [hook hooks] - (let [hook-bytes (Utils/toByteArray hook) - deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)] - (.start deser-hook topo-conf worker-topology-context))))) - -(defn run-worker-shutdown-hooks [worker] - (let [topology (:topology worker) - hooks (.get_worker_hooks topology)] - (dofor [hook hooks] - (let [hook-bytes (Utils/toByteArray hook) - deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)] - (.shutdown deser-hook))))) - -;; TODO: should worker even take the storm-id as input? this should be -;; deducable from cluster state (by searching through assignments) -;; what about if there's inconsistency in assignments? -> but nimbus -;; should guarantee this consistency -(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id] - (log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id - " and conf " conf) - (if-not (local-mode? conf) - (redirect-stdio-to-slf4j!)) - ;; because in local mode, its not a separate - ;; process. supervisor will register it in this case - (when (= :distributed (cluster-mode conf)) - (let [pid (process-pid)] - (touch (worker-pid-path conf worker-id pid)) - (spit (worker-artifacts-pid-path conf storm-id port) pid))) - - (declare establish-log-setting-callback) - - ;; start out with empty list of timeouts - (def latest-log-config (atom {})) - (def original-log-levels (atom {})) - - (let [storm-conf (read-supervisor-storm-conf conf storm-id) - storm-conf (override-login-config-with-system-property storm-conf) - acls (Utils/getWorkerACL storm-conf) - cluster-state (cluster/mk-distributed-cluster-state conf :auth-conf storm-conf :acls acls :context (ClusterStateContext. DaemonType/WORKER)) - storm-cluster-state (cluster/mk-storm-cluster-state cluster-state :acls acls) - initial-credentials (.credentials storm-cluster-state storm-id nil) - auto-creds (AuthUtils/GetAutoCredentials storm-conf) - subject (AuthUtils/populateSubject nil auto-creds initial-credentials)] - (Subject/doAs subject (reify PrivilegedExceptionAction - (run [this] - (let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state) - heartbeat-fn #(do-heartbeat worker) - - ;; do this here so that the worker process dies if this fails - ;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on) - _ (heartbeat-fn) - - executors (atom nil) - ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout - ;; to the supervisor - _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn) - _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors)) - - _ (register-callbacks worker) - - refresh-connections (mk-refresh-connections worker) - refresh-load (mk-refresh-load worker) - - _ (refresh-connections nil) - - _ (activate-worker-when-all-connections-ready worker) - - _ (refresh-storm-active worker nil) - - _ (run-worker-start-hooks worker) - - _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials))) - - transfer-tuples (mk-transfer-tuples-handler worker) - - transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples) - - disruptor-handler (mk-disruptor-backpressure-handler worker) - _ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler) - _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK)) - (.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK)) - (.setEnableBackpressure ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE))) - backpressure-handler (mk-backpressure-handler @executors) - backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler) - _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) - (.start backpressure-thread)) - topology-backpressure-callback (fn cb [& ignored] - (let [throttle-on (.topology-backpressure storm-cluster-state storm-id cb)] - (reset! (:throttle-on worker) throttle-on))) - _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) - (.topology-backpressure storm-cluster-state storm-id topology-backpressure-callback)) - - shutdown* (fn [] - (log-message "Shutting down worker " storm-id " " assignment-id " " port) - (doseq [[_ socket] @(:cached-node+port->socket worker)] - ;; this will do best effort flushing since the linger period - ;; was set on creation - (.close socket)) - (log-message "Terminating messaging context") - (log-message "Shutting down executors") - (doseq [executor @executors] (.shutdown executor)) - (log-message "Shut down executors") - - ;;this is fine because the only time this is shared is when it's a local context, - ;;in which case it's a noop - (.term ^IContext (:mq-context worker)) - (log-message "Shutting down transfer thread") - (disruptor/halt-with-interrupt! (:transfer-queue worker)) - - (.interrupt transfer-thread) - (.join transfer-thread) - (log-message "Shut down transfer thread") - (.terminate backpressure-thread) - (log-message "Shut down backpressure thread") - (cancel-timer (:heartbeat-timer worker)) - (cancel-timer (:refresh-connections-timer worker)) - (cancel-timer (:refresh-credentials-timer worker)) - (cancel-timer (:refresh-active-timer worker)) - (cancel-timer (:executor-heartbeat-timer worker)) - (cancel-timer (:user-timer worker)) - (cancel-timer (:refresh-load-timer worker)) - - (close-resources worker) - - (log-message "Trigger any worker shutdown hooks") - (run-worker-shutdown-hooks worker) - - (.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port) - (.remove-worker-backpressure! (:storm-cluster-state worker) storm-id assignment-id port) - - (log-message "Disconnecting from storm cluster state context") - (.disconnect (:storm-cluster-state worker)) - (.close (:cluster-state worker)) - (log-message "Shut down worker " storm-id " " assignment-id " " port)) - ret (reify - Shutdownable - (shutdown - [this] - (shutdown*)) - DaemonCommon - (waiting? [this] - (and - (timer-waiting? (:heartbeat-timer worker)) - (timer-waiting? (:refresh-connections-timer worker)) - (timer-waiting? (:refresh-load-timer worker)) - (timer-waiting? (:refresh-credentials-timer worker)) - (timer-waiting? (:refresh-active-timer worker)) - (timer-waiting? (:executor-heartbeat-timer worker)) - (timer-waiting? (:user-timer worker)) - )) - ) - credentials (atom initial-credentials) - check-credentials-changed (fn [] - (let [new-creds (.credentials (:storm-cluster-state worker) storm-id nil)] - (when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed - (AuthUtils/updateSubject subject auto-creds new-creds) - (dofor [e @executors] (.credentials-changed e new-creds)) - (reset! credentials new-creds)))) - check-log-config-changed (fn [] - (let [log-config (.topology-log-config (:storm-cluster-state worker) storm-id nil)] - (process-log-config-change latest-log-config original-log-levels log-config) - (establish-log-setting-callback)))] - (reset! original-log-levels (get-logger-levels)) - (log-message "Started with log levels: " @original-log-levels) - - (defn establish-log-setting-callback [] - (.topology-log-config (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed)))) - - (establish-log-setting-callback) - (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed))) - (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS) - (fn [& args] - (check-credentials-changed))) - - (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) - (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback)) - - ;; The jitter allows the clients to get the data at different times, and avoids thundering herd - (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING) - (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load)) - (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections) - (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config))) - (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker)) - - (log-message "Worker has topology config " (redact-value (:storm-conf worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) - (log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading") - ret - )))))) - -(defmethod mk-suicide-fn - :local [conf] - (fn [] (exit-process! 1 "Worker died"))) - -(defmethod mk-suicide-fn - :distributed [conf] - (fn [] (exit-process! 1 "Worker died"))) - -(defn -main [storm-id assignment-id port-str worker-id] - (let [conf (read-storm-config)] - (setup-default-uncaught-exception-handler) - (validate-distributed-mode! conf) - (let [worker (mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)] - (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown worker)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/disruptor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj deleted file mode 100644 index 1546b3f..0000000 --- a/storm-core/src/clj/backtype/storm/disruptor.clj +++ /dev/null @@ -1,89 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.disruptor - (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback]) - (:import [com.lmax.disruptor.dsl ProducerType]) - (:require [clojure [string :as str]]) - (:require [clojure [set :as set]]) - (:use [clojure walk]) - (:use [org.apache.storm util log])) - -(def PRODUCER-TYPE - {:multi-threaded ProducerType/MULTI - :single-threaded ProducerType/SINGLE}) - -(defnk disruptor-queue - [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] - (DisruptorQueue. queue-name - (PRODUCER-TYPE producer-type) buffer-size - timeout batch-size batch-timeout)) - -(defn clojure-handler - [afn] - (reify com.lmax.disruptor.EventHandler - (onEvent - [this o seq-id batchEnd?] - (afn o seq-id batchEnd?)))) - -(defn disruptor-backpressure-handler - [afn-high-wm afn-low-wm] - (reify DisruptorBackpressureCallback - (highWaterMark - [this] - (afn-high-wm)) - (lowWaterMark - [this] - (afn-low-wm)))) - -(defn worker-backpressure-handler - [afn] - (reify WorkerBackpressureCallback - (onEvent - [this o] - (afn o)))) - -(defmacro handler - [& args] - `(clojure-handler (fn ~@args))) - -(defn publish - [^DisruptorQueue q o] - (.publish q o)) - -(defn consume-batch - [^DisruptorQueue queue handler] - (.consumeBatch queue handler)) - -(defn consume-batch-when-available - [^DisruptorQueue queue handler] - (.consumeBatchWhenAvailable queue handler)) - -(defn halt-with-interrupt! - [^DisruptorQueue queue] - (.haltWithInterrupt queue)) - -(defnk consume-loop* - [^DisruptorQueue queue handler - :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))] - (async-loop - (fn [] (consume-batch-when-available queue handler) 0) - :kill-fn kill-fn - :thread-name (.getName queue))) - -(defmacro consume-loop [queue & handler-args] - `(let [handler# (handler ~@handler-args)] - (consume-loop* ~queue handler#))) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/event.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/event.clj b/storm-core/src/clj/backtype/storm/event.clj deleted file mode 100644 index edc7616..0000000 --- a/storm-core/src/clj/backtype/storm/event.clj +++ /dev/null @@ -1,71 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.event - (:use [org.apache.storm log util]) - (:import [org.apache.storm.utils Time Utils]) - (:import [java.io InterruptedIOException]) - (:import [java.util.concurrent LinkedBlockingQueue TimeUnit])) - -(defprotocol EventManager - (add [this event-fn]) - (waiting? [this]) - (shutdown [this])) - -(defn event-manager - "Creates a thread to respond to events. Any error will cause process to halt" - [daemon?] - (let [added (atom 0) - processed (atom 0) - ^LinkedBlockingQueue queue (LinkedBlockingQueue.) - running (atom true) - runner (Thread. - (fn [] - (try-cause - (while @running - (let [r (.take queue)] - (r) - (swap! processed inc))) - (catch InterruptedIOException t - (log-message "Event manager interrupted while doing IO")) - (catch InterruptedException t - (log-message "Event manager interrupted")) - (catch Throwable t - (log-error t "Error when processing event") - (exit-process! 20 "Error when processing an event")))))] - (.setDaemon runner daemon?) - (.start runner) - (reify - EventManager - - (add - [this event-fn] - ;; should keep track of total added and processed to know if this is finished yet - (when-not @running - (throw (RuntimeException. "Cannot add events to a shutdown event manager"))) - (swap! added inc) - (.put queue event-fn)) - - (waiting? - [this] - (or (Time/isThreadWaiting runner) - (= @processed @added))) - - (shutdown - [this] - (reset! running false) - (.interrupt runner) - (.join runner))))) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/local_state.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/local_state.clj b/storm-core/src/clj/backtype/storm/local_state.clj deleted file mode 100644 index a95a85b..0000000 --- a/storm-core/src/clj/backtype/storm/local_state.clj +++ /dev/null @@ -1,131 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.local-state - (:use [org.apache.storm log util]) - (:import [org.apache.storm.generated StormTopology - InvalidTopologyException GlobalStreamId - LSSupervisorId LSApprovedWorkers - LSSupervisorAssignments LocalAssignment - ExecutorInfo LSWorkerHeartbeat - LSTopoHistory LSTopoHistoryList - WorkerResources]) - (:import [org.apache.storm.utils LocalState])) - -(def LS-WORKER-HEARTBEAT "worker-heartbeat") -(def LS-ID "supervisor-id") -(def LS-LOCAL-ASSIGNMENTS "local-assignments") -(def LS-APPROVED-WORKERS "approved-workers") -(def LS-TOPO-HISTORY "topo-hist") - -(defn ->LSTopoHistory - [{topoid :topoid timestamp :timestamp users :users groups :groups}] - (LSTopoHistory. topoid timestamp users groups)) - -(defn ->topo-history - [thrift-topo-hist] - { - :topoid (.get_topology_id thrift-topo-hist) - :timestamp (.get_time_stamp thrift-topo-hist) - :users (.get_users thrift-topo-hist) - :groups (.get_groups thrift-topo-hist)}) - -(defn ls-topo-hist! - [^LocalState local-state hist-list] - (.put local-state LS-TOPO-HISTORY - (LSTopoHistoryList. (map ->LSTopoHistory hist-list)))) - -(defn ls-topo-hist - [^LocalState local-state] - (if-let [thrift-hist-list (.get local-state LS-TOPO-HISTORY)] - (map ->topo-history (.get_topo_history thrift-hist-list)))) - -(defn ls-supervisor-id! - [^LocalState local-state ^String id] - (.put local-state LS-ID (LSSupervisorId. id))) - -(defn ls-supervisor-id - [^LocalState local-state] - (if-let [super-id (.get local-state LS-ID)] - (.get_supervisor_id super-id))) - -(defn ls-approved-workers! - [^LocalState local-state workers] - (.put local-state LS-APPROVED-WORKERS (LSApprovedWorkers. workers))) - -(defn ls-approved-workers - [^LocalState local-state] - (if-let [tmp (.get local-state LS-APPROVED-WORKERS)] - (into {} (.get_approved_workers tmp)))) - -(defn ->ExecutorInfo - [[low high]] (ExecutorInfo. low high)) - -(defn ->ExecutorInfo-list - [executors] - (map ->ExecutorInfo executors)) - -(defn ->executor-list - [executors] - (into [] - (for [exec-info executors] - [(.get_task_start exec-info) (.get_task_end exec-info)]))) - -(defn ->LocalAssignment - [{storm-id :storm-id executors :executors resources :resources}] - (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))] - (if resources (.set_resources assignment - (doto (WorkerResources. ) - (.set_mem_on_heap (first resources)) - (.set_mem_off_heap (second resources)) - (.set_cpu (last resources))))) - assignment)) - -(defn mk-local-assignment - [storm-id executors resources] - {:storm-id storm-id :executors executors :resources resources}) - -(defn ->local-assignment - [^LocalAssignment thrift-local-assignment] - (mk-local-assignment - (.get_topology_id thrift-local-assignment) - (->executor-list (.get_executors thrift-local-assignment)) - (.get_resources thrift-local-assignment))) - -(defn ls-local-assignments! - [^LocalState local-state assignments] - (let [local-assignment-map (map-val ->LocalAssignment assignments)] - (.put local-state LS-LOCAL-ASSIGNMENTS - (LSSupervisorAssignments. local-assignment-map)))) - -(defn ls-local-assignments - [^LocalState local-state] - (if-let [thrift-local-assignments (.get local-state LS-LOCAL-ASSIGNMENTS)] - (map-val - ->local-assignment - (.get_assignments thrift-local-assignments)))) - -(defn ls-worker-heartbeat! - [^LocalState local-state time-secs storm-id executors port] - (.put local-state LS-WORKER-HEARTBEAT (LSWorkerHeartbeat. time-secs storm-id (->ExecutorInfo-list executors) port) false)) - -(defn ls-worker-heartbeat - [^LocalState local-state] - (if-let [worker-hb (.get local-state LS-WORKER-HEARTBEAT)] - {:time-secs (.get_time_secs worker-hb) - :storm-id (.get_topology_id worker-hb) - :executors (->executor-list (.get_executors worker-hb)) - :port (.get_port worker-hb)})) - http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/log.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/log.clj b/storm-core/src/clj/backtype/storm/log.clj deleted file mode 100644 index 96570e3..0000000 --- a/storm-core/src/clj/backtype/storm/log.clj +++ /dev/null @@ -1,56 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.log - (:require [clojure.tools.logging :as log]) - (:use [clojure pprint]) - (:import [java.io StringWriter])) - -(defmacro log-message - [& args] - `(log/info (str ~@args))) - -(defmacro log-error - [e & args] - `(log/log :error ~e (str ~@args))) - -(defmacro log-debug - [& args] - `(log/debug (str ~@args))) - -(defmacro log-warn-error - [e & args] - `(log/warn (str ~@args) ~e)) - -(defmacro log-warn - [& args] - `(log/warn (str ~@args))) - -(defn log-capture! - [& args] - (apply log/log-capture! args)) - -(defn log-stream - [& args] - (apply log/log-stream args)) - -(defmacro log-pprint - [& args] - `(let [^StringWriter writer# (StringWriter.)] - (doall - (for [object# [~@args]] - (pprint object# writer#))) - (log-message "\n" writer#))) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/messaging/loader.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj b/storm-core/src/clj/backtype/storm/messaging/loader.clj deleted file mode 100644 index b190ab0..0000000 --- a/storm-core/src/clj/backtype/storm/messaging/loader.clj +++ /dev/null @@ -1,34 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.messaging.loader - (:import [org.apache.storm.messaging IConnection DeserializingConnectionCallback]) - (:require [org.apache.storm.messaging [local :as local]])) - -(defn mk-local-context [] - (local/mk-context)) - -(defn- mk-connection-callback - "make an IConnectionCallback" - [transfer-local-fn storm-conf worker-context] - (DeserializingConnectionCallback. storm-conf - worker-context - (fn [batch] - (transfer-local-fn batch)))) - -(defn register-callback - "register the local-transfer-fn with the server" - [transfer-local-fn ^IConnection socket storm-conf worker-context] - (.registerRecv socket (mk-connection-callback transfer-local-fn storm-conf worker-context))) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/messaging/local.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj deleted file mode 100644 index 32fbb34..0000000 --- a/storm-core/src/clj/backtype/storm/messaging/local.clj +++ /dev/null @@ -1,23 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.messaging.local - (:import [org.apache.storm.messaging IContext]) - (:import [org.apache.storm.messaging.local Context])) - -(defn mk-context [] - (let [context (Context.)] - (.prepare ^IContext context nil) - context)) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/metric/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/metric/testing.clj b/storm-core/src/clj/backtype/storm/metric/testing.clj deleted file mode 100644 index a8ec438..0000000 --- a/storm-core/src/clj/backtype/storm/metric/testing.clj +++ /dev/null @@ -1,68 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.metric.testing - "This namespace is for AOT dependent metrics testing code." - (:gen-class)) - -(letfn [(for- [threader arg seq-exprs body] - `(reduce #(%2 %1) - ~arg - (for ~seq-exprs - (fn [arg#] (~threader arg# ~@body)))))] - (defmacro for-> - "Apply a thread expression to a sequence. - eg. - (-> 1 - (for-> [x [1 2 3]] - (+ x))) - => 7" - {:indent 1} - [arg seq-exprs & body] - (for- 'clojure.core/-> arg seq-exprs body))) - -(gen-class - :name clojure.storm.metric.testing.FakeMetricConsumer - :implements [org.apache.storm.metric.api.IMetricsConsumer] - :prefix "impl-") - -(def buffer (atom nil)) - -(defn impl-prepare [this conf argument ctx error-reporter] - (reset! buffer {})) - -(defn impl-cleanup [this] - (reset! buffer {})) - -(defn vec-conj [coll x] (if coll - (conj coll x) - [x])) - -(defn expand-complex-datapoint [dp] - (if (or (map? (.value dp)) - (instance? java.util.AbstractMap (.value dp))) - (into [] (for [[k v] (.value dp)] - [(str (.name dp) "/" k) v])) - [[(.name dp) (.value dp)]])) - -(defn impl-handleDataPoints [this task-info data-points] - (swap! buffer - (fn [old] - (-> old - (for-> [dp data-points - [name val] (expand-complex-datapoint dp)] - (update-in [(.srcComponentId task-info) name (.srcTaskId task-info)] vec-conj val)))))) - - http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/pacemaker/pacemaker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/pacemaker/pacemaker.clj b/storm-core/src/clj/backtype/storm/pacemaker/pacemaker.clj deleted file mode 100644 index 70313e4..0000000 --- a/storm-core/src/clj/backtype/storm/pacemaker/pacemaker.clj +++ /dev/null @@ -1,241 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.pacemaker.pacemaker - (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler] - [java.util.concurrent ConcurrentHashMap] - [java.util.concurrent.atomic AtomicInteger] - [org.apache.storm.generated HBNodes - HBServerMessageType HBMessage HBMessageData HBPulse] - [org.apache.storm.utils VersionInfo]) - (:use [clojure.string :only [replace-first split]] - [org.apache.storm log config util]) - (:require [clojure.java.jmx :as jmx]) - (:gen-class)) - -(def STORM-VERSION (VersionInfo/getVersion)) - -;; Stats Functions - -(def sleep-seconds 60) - - -(defn- check-and-set-loop [stats key new & {:keys [compare new-fn] - :or {compare (fn [new old] true) - new-fn (fn [new old] new)}}] - (loop [] - (let [old (.get (key stats)) - new (new-fn new old)] - (if (compare new old) - (if (.compareAndSet (key stats) old new) - nil - (recur)) - nil)))) - -(defn- set-average [stats size] - (check-and-set-loop - stats - :average-heartbeat-size - size - :new-fn (fn [new old] - (let [count (.get (:send-pulse-count stats))] - ; Weighted average - (/ (+ new (* count old)) (+ count 1)))))) - -(defn- set-largest [stats size] - (check-and-set-loop - stats - :largest-heartbeat-size - size - :compare #'>)) - -(defn- report-stats [heartbeats stats last-five-s] - (loop [] - (let [send-count (.getAndSet (:send-pulse-count stats) 0) - received-size (.getAndSet (:total-received-size stats) 0) - get-count (.getAndSet (:get-pulse-count stats) 0) - sent-size (.getAndSet (:total-sent-size stats) 0) - largest (.getAndSet (:largest-heartbeat-size stats) 0) - average (.getAndSet (:average-heartbeat-size stats) 0) - total-keys (.size heartbeats)] - (log-debug "\nReceived " send-count " heartbeats totaling " received-size " bytes,\n" - "Sent " get-count " heartbeats totaling " sent-size " bytes,\n" - "The largest heartbeat was " largest " bytes,\n" - "The average heartbeat was " average " bytes,\n" - "Pacemaker contained " total-keys " total keys\n" - "in the last " sleep-seconds " second(s)") - (dosync (ref-set last-five-s - {:send-pulse-count send-count - :total-received-size received-size - :get-pulse-count get-count - :total-sent-size sent-size - :largest-heartbeat-size largest - :average-heartbeat-size average - :total-keys total-keys}))) - (Thread/sleep (* 1000 sleep-seconds)) - (recur))) - -;; JMX stuff -(defn register [last-five-s] - (jmx/register-mbean - (jmx/create-bean - last-five-s) - "org.apache.storm.pacemaker.pacemaker:stats=Stats_Last_5_Seconds")) - - -;; Pacemaker Functions - -(defn hb-data [] - (ConcurrentHashMap.)) - -(defn create-path [^String path heartbeats] - (HBMessage. HBServerMessageType/CREATE_PATH_RESPONSE nil)) - -(defn exists [^String path heartbeats] - (let [it-does (.containsKey heartbeats path)] - (log-debug (str "Checking if path [" path "] exists..." it-does ".")) - (HBMessage. HBServerMessageType/EXISTS_RESPONSE - (HBMessageData/boolval it-does)))) - -(defn send-pulse [^HBPulse pulse heartbeats pacemaker-stats] - (let [id (.get_id pulse) - details (.get_details pulse)] - (log-debug (str "Saving Pulse for id [" id "] data [" + (str details) "].")) - - (.incrementAndGet (:send-pulse-count pacemaker-stats)) - (.addAndGet (:total-received-size pacemaker-stats) (alength details)) - (set-largest pacemaker-stats (alength details)) - (set-average pacemaker-stats (alength details)) - - (.put heartbeats id details) - (HBMessage. HBServerMessageType/SEND_PULSE_RESPONSE nil))) - -(defn get-all-pulse-for-path [^String path heartbeats] - (HBMessage. HBServerMessageType/GET_ALL_PULSE_FOR_PATH_RESPONSE nil)) - -(defn get-all-nodes-for-path [^String path ^ConcurrentHashMap heartbeats] - (log-debug "List all nodes for path " path) - (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE - (HBMessageData/nodes - (HBNodes. (distinct (for [k (.keySet heartbeats) - :let [trimmed-k (first - (filter #(not (= "" %)) - (split (replace-first k path "") #"/")))] - :when (and - (not (nil? trimmed-k)) - (= (.indexOf k path) 0))] - trimmed-k)))))) - -(defn get-pulse [^String path heartbeats pacemaker-stats] - (let [details (.get heartbeats path)] - (log-debug (str "Getting Pulse for path [" path "]...data " (str details) "].")) - - - (.incrementAndGet (:get-pulse-count pacemaker-stats)) - (if details - (.addAndGet (:total-sent-size pacemaker-stats) (alength details))) - - (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE - (HBMessageData/pulse - (doto (HBPulse. ) (.set_id path) (.set_details details)))))) - -(defn delete-pulse-id [^String path heartbeats] - (log-debug (str "Deleting Pulse for id [" path "].")) - (.remove heartbeats path) - (HBMessage. HBServerMessageType/DELETE_PULSE_ID_RESPONSE nil)) - -(defn delete-path [^String path heartbeats] - (let [prefix (if (= \/ (last path)) path (str path "/"))] - (doseq [k (.keySet heartbeats) - :when (= (.indexOf k prefix) 0)] - (delete-pulse-id k heartbeats))) - (HBMessage. HBServerMessageType/DELETE_PATH_RESPONSE nil)) - -(defn not-authorized [] - (HBMessage. HBServerMessageType/NOT_AUTHORIZED nil)) - -(defn mk-handler [conf] - (let [heartbeats ^ConcurrentHashMap (hb-data) - pacemaker-stats {:send-pulse-count (AtomicInteger.) - :total-received-size (AtomicInteger.) - :get-pulse-count (AtomicInteger.) - :total-sent-size (AtomicInteger.) - :largest-heartbeat-size (AtomicInteger.) - :average-heartbeat-size (AtomicInteger.)} - last-five (ref {:send-pulse-count 0 - :total-received-size 0 - :get-pulse-count 0 - :total-sent-size 0 - :largest-heartbeat-size 0 - :average-heartbeat-size 0 - :total-keys 0}) - stats-thread (Thread. (fn [] (report-stats heartbeats pacemaker-stats last-five)))] - (.setDaemon stats-thread true) - (.start stats-thread) - (register last-five) - (reify - IServerMessageHandler - (^HBMessage handleMessage [this ^HBMessage request ^boolean authenticated] - (let [response - (condp = (.get_type request) - HBServerMessageType/CREATE_PATH - (create-path (.get_path (.get_data request)) heartbeats) - - HBServerMessageType/EXISTS - (if authenticated - (exists (.get_path (.get_data request)) heartbeats) - (not-authorized)) - - HBServerMessageType/SEND_PULSE - (send-pulse (.get_pulse (.get_data request)) heartbeats pacemaker-stats) - - HBServerMessageType/GET_ALL_PULSE_FOR_PATH - (if authenticated - (get-all-pulse-for-path (.get_path (.get_data request)) heartbeats) - (not-authorized)) - - HBServerMessageType/GET_ALL_NODES_FOR_PATH - (if authenticated - (get-all-nodes-for-path (.get_path (.get_data request)) heartbeats) - (not-authorized)) - - HBServerMessageType/GET_PULSE - (if authenticated - (get-pulse (.get_path (.get_data request)) heartbeats pacemaker-stats) - (not-authorized)) - - HBServerMessageType/DELETE_PATH - (delete-path (.get_path (.get_data request)) heartbeats) - - HBServerMessageType/DELETE_PULSE_ID - (delete-pulse-id (.get_path (.get_data request)) heartbeats) - - ; Otherwise - (log-message "Got Unexpected Type: " (.get_type request)))] - - (.set_message_id response (.get_message_id request)) - response))))) - -(defn launch-server! [] - (log-message "Starting pacemaker server for storm version '" - STORM-VERSION - "'") - (let [conf (override-login-config-with-system-property (read-storm-config))] - (PacemakerServer. (mk-handler conf) conf))) - -(defn -main [] - (redirect-stdio-to-slf4j!) - (launch-server!)) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/pacemaker/pacemaker_state_factory.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/pacemaker/pacemaker_state_factory.clj b/storm-core/src/clj/backtype/storm/pacemaker/pacemaker_state_factory.clj deleted file mode 100644 index cede59e..0000000 --- a/storm-core/src/clj/backtype/storm/pacemaker/pacemaker_state_factory.clj +++ /dev/null @@ -1,125 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.pacemaker.pacemaker-state-factory - (:require [org.apache.storm.pacemaker pacemaker] - [org.apache.storm.cluster-state [zookeeper-state-factory :as zk-factory]] - [org.apache.storm - [config :refer :all] - [cluster :refer :all] - [log :refer :all] - [util :as util]]) - (:import [org.apache.storm.generated - HBExecutionException HBServerMessageType HBMessage - HBMessageData HBPulse] - [org.apache.storm.cluster_state zookeeper_state_factory] - [org.apache.storm.cluster ClusterState] - [org.apache.storm.pacemaker PacemakerClient]) - (:gen-class - :implements [org.apache.storm.cluster.ClusterStateFactory])) - -;; So we can mock the client for testing -(defn makeClient [conf] - (PacemakerClient. conf)) - -(defn makeZKState [conf auth-conf acls context] - (.mkState (zookeeper_state_factory.) conf auth-conf acls context)) - -(def max-retries 10) - -(defn -mkState [this conf auth-conf acls context] - (let [zk-state (makeZKState conf auth-conf acls context) - pacemaker-client (makeClient conf)] - - (reify - ClusterState - ;; Let these pass through to the zk-state. We only want to handle heartbeats. - (register [this callback] (.register zk-state callback)) - (unregister [this callback] (.unregister zk-state callback)) - (set_ephemeral_node [this path data acls] (.set_ephemeral_node zk-state path data acls)) - (create_sequential [this path data acls] (.create_sequential zk-state path data acls)) - (set_data [this path data acls] (.set_data zk-state path data acls)) - (delete_node [this path] (.delete_node zk-state path)) - (delete_node_blobstore [this path nimbus-host-port-info] (.delete_node_blobstore zk-state path nimbus-host-port-info)) - (get_data [this path watch?] (.get_data zk-state path watch?)) - (get_data_with_version [this path watch?] (.get_data_with_version zk-state path watch?)) - (get_version [this path watch?] (.get_version zk-state path watch?)) - (get_children [this path watch?] (.get_children zk-state path watch?)) - (mkdirs [this path acls] (.mkdirs zk-state path acls)) - (node_exists [this path watch?] (.node_exists zk-state path watch?)) - (add_listener [this listener] (.add_listener zk-state listener)) - (sync_path [this path] (.sync_path zk-state path)) - - (set_worker_hb [this path data acls] - (util/retry-on-exception - max-retries - "set_worker_hb" - #(let [response - (.send pacemaker-client - (HBMessage. HBServerMessageType/SEND_PULSE - (HBMessageData/pulse - (doto (HBPulse.) - (.set_id path) - (.set_details data)))))] - (if (= (.get_type response) HBServerMessageType/SEND_PULSE_RESPONSE) - :ok - (throw (HBExecutionException. "Invalid Response Type")))))) - - (delete_worker_hb [this path] - (util/retry-on-exception - max-retries - "delete_worker_hb" - #(let [response - (.send pacemaker-client - (HBMessage. HBServerMessageType/DELETE_PATH - (HBMessageData/path path)))] - (if (= (.get_type response) HBServerMessageType/DELETE_PATH_RESPONSE) - :ok - (throw (HBExecutionException. "Invalid Response Type")))))) - - (get_worker_hb [this path watch?] - (util/retry-on-exception - max-retries - "get_worker_hb" - #(let [response - (.send pacemaker-client - (HBMessage. HBServerMessageType/GET_PULSE - (HBMessageData/path path)))] - (if (= (.get_type response) HBServerMessageType/GET_PULSE_RESPONSE) - (try - (.get_details (.get_pulse (.get_data response))) - (catch Exception e - (throw (HBExecutionException. (.toString e))))) - (throw (HBExecutionException. "Invalid Response Type")))))) - - (get_worker_hb_children [this path watch?] - (util/retry-on-exception - max-retries - "get_worker_hb_children" - #(let [response - (.send pacemaker-client - (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH - (HBMessageData/path path)))] - (if (= (.get_type response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE) - (try - (into [] (.get_pulseIds (.get_nodes (.get_data response)))) - (catch Exception e - (throw (HBExecutionException. (.toString e))))) - (throw (HBExecutionException. "Invalid Response Type")))))) - - (close [this] - (.close zk-state) - (.close pacemaker-client))))) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/process_simulator.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/process_simulator.clj b/storm-core/src/clj/backtype/storm/process_simulator.clj deleted file mode 100644 index 03c3dd9..0000000 --- a/storm-core/src/clj/backtype/storm/process_simulator.clj +++ /dev/null @@ -1,51 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.process-simulator - (:use [org.apache.storm log util])) - -(def pid-counter (mk-counter)) - -(def process-map (atom {})) - -(def kill-lock (Object.)) - -(defn register-process [pid shutdownable] - (swap! process-map assoc pid shutdownable)) - -(defn process-handle - [pid] - (@process-map pid)) - -(defn all-processes - [] - (vals @process-map)) - -(defn kill-process - "Uses `locking` in case cluster shuts down while supervisor is - killing a task" - [pid] - (locking kill-lock - (log-message "Killing process " pid) - (let [shutdownable (process-handle pid)] - (swap! process-map dissoc pid) - (when shutdownable - (.shutdown shutdownable))))) - -(defn kill-all-processes - [] - (doseq [pid (keys @process-map)] - (kill-process pid))) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj deleted file mode 100644 index f6f89f8..0000000 --- a/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj +++ /dev/null @@ -1,77 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.scheduler.DefaultScheduler - (:use [org.apache.storm util config]) - (:require [org.apache.storm.scheduler.EvenScheduler :as EvenScheduler]) - (:import [org.apache.storm.scheduler IScheduler Topologies - Cluster TopologyDetails WorkerSlot SchedulerAssignment - EvenScheduler ExecutorDetails]) - (:gen-class - :implements [org.apache.storm.scheduler.IScheduler])) - -(defn- bad-slots [existing-slots num-executors num-workers] - (if (= 0 num-workers) - '() - (let [distribution (atom (integer-divided num-executors num-workers)) - keepers (atom {})] - (doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]] - (when (pos? (get @distribution executor-count 0)) - (swap! keepers assoc node+port executor-list) - (swap! distribution update-in [executor-count] dec) - )) - (->> @keepers - keys - (apply dissoc existing-slots) - keys - (map (fn [[node port]] - (WorkerSlot. node port))))))) - -(defn slots-can-reassign [^Cluster cluster slots] - (->> slots - (filter - (fn [[node port]] - (if-not (.isBlackListed cluster node) - (if-let [supervisor (.getSupervisorById cluster node)] - (.contains (.getAllPorts supervisor) (int port)) - )))))) - -(defn -prepare [this conf] - ) - -(defn default-schedule [^Topologies topologies ^Cluster cluster] - (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)] - (doseq [^TopologyDetails topology needs-scheduling-topologies - :let [topology-id (.getId topology) - available-slots (->> (.getAvailableSlots cluster) - (map #(vector (.getNodeId %) (.getPort %)))) - all-executors (->> topology - .getExecutors - (map #(vector (.getStartTask %) (.getEndTask %))) - set) - alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id) - alive-executors (->> alive-assigned vals (apply concat) set) - can-reassign-slots (slots-can-reassign cluster (keys alive-assigned)) - total-slots-to-use (min (.getNumWorkers topology) - (+ (count can-reassign-slots) (count available-slots))) - bad-slots (if (or (> total-slots-to-use (count alive-assigned)) - (not= alive-executors all-executors)) - (bad-slots alive-assigned (count all-executors) total-slots-to-use) - [])]] - (.freeSlots cluster bad-slots) - (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster)))) - -(defn -schedule [this ^Topologies topologies ^Cluster cluster] - (default-schedule topologies cluster)) http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj deleted file mode 100644 index 783da26..0000000 --- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj +++ /dev/null @@ -1,81 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.scheduler.EvenScheduler - (:use [org.apache.storm util log config]) - (:require [clojure.set :as set]) - (:import [org.apache.storm.scheduler IScheduler Topologies - Cluster TopologyDetails WorkerSlot ExecutorDetails]) - (:gen-class - :implements [org.apache.storm.scheduler.IScheduler])) - -(defn sort-slots [all-slots] - (let [split-up (sort-by count > (vals (group-by first all-slots)))] - (apply interleave-all split-up) - )) - -(defn get-alive-assigned-node+port->executors [cluster topology-id] - (let [existing-assignment (.getAssignmentById cluster topology-id) - executor->slot (if existing-assignment - (.getExecutorToSlot existing-assignment) - {}) - executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot - :let [executor [(.getStartTask executor) (.getEndTask executor)] - node+port [(.getNodeId slot) (.getPort slot)]]] - {executor node+port})) - alive-assigned (reverse-map executor->node+port)] - alive-assigned)) - -(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster] - (let [topology-id (.getId topology) - available-slots (->> (.getAvailableSlots cluster) - (map #(vector (.getNodeId %) (.getPort %)))) - all-executors (->> topology - .getExecutors - (map #(vector (.getStartTask %) (.getEndTask %))) - set) - alive-assigned (get-alive-assigned-node+port->executors cluster topology-id) - total-slots-to-use (min (.getNumWorkers topology) - (+ (count available-slots) (count alive-assigned))) - reassign-slots (take (- total-slots-to-use (count alive-assigned)) - (sort-slots available-slots)) - reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned))))) - reassignment (into {} - (map vector - reassign-executors - ;; for some reason it goes into infinite loop without limiting the repeat-seq - (repeat-seq (count reassign-executors) reassign-slots)))] - (when-not (empty? reassignment) - (log-message "Available slots: " (pr-str available-slots)) - ) - reassignment)) - -(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster] - (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)] - (doseq [^TopologyDetails topology needs-scheduling-topologies - :let [topology-id (.getId topology) - new-assignment (schedule-topology topology cluster) - node+port->executors (reverse-map new-assignment)]] - (doseq [[node+port executors] node+port->executors - :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port)) - executors (for [[start-task end-task] executors] - (ExecutorDetails. start-task end-task))]] - (.assign cluster slot topology-id executors))))) - -(defn -prepare [this conf] - ) - -(defn -schedule [this ^Topologies topologies ^Cluster cluster] - (schedule-topologies-evenly topologies cluster))