http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj 
b/storm-core/src/clj/backtype/storm/daemon/worker.clj
deleted file mode 100644
index 1326672..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ /dev/null
@@ -1,768 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.worker
-  (:use [org.apache.storm.daemon common])
-  (:use [org.apache.storm config log util timer local-state])
-  (:require [clj-time.core :as time])
-  (:require [clj-time.coerce :as coerce])
-  (:require [org.apache.storm.daemon [executor :as executor]])
-  (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]])
-  (:require [clojure.set :as set])
-  (:require [org.apache.storm.messaging.loader :as msg-loader])
-  (:import [java.util.concurrent Executors]
-           [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
-  (:import [java.util ArrayList HashMap])
-  (:import [org.apache.storm.utils Utils TransferDrainer ThriftTopologyUtils 
WorkerBackpressureThread DisruptorQueue])
-  (:import [org.apache.storm.grouping LoadMapping])
-  (:import [org.apache.storm.messaging TransportFactory])
-  (:import [org.apache.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
-  (:import [org.apache.storm.daemon Shutdownable])
-  (:import [org.apache.storm.serialization KryoTupleSerializer])
-  (:import [org.apache.storm.generated StormTopology])
-  (:import [org.apache.storm.tuple AddressedTuple Fields])
-  (:import [org.apache.storm.task WorkerTopologyContext])
-  (:import [org.apache.storm Constants])
-  (:import [org.apache.storm.security.auth AuthUtils])
-  (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
-  (:import [javax.security.auth Subject])
-  (:import [java.security PrivilegedExceptionAction])
-  (:import [org.apache.logging.log4j LogManager])
-  (:import [org.apache.logging.log4j Level])
-  (:import [org.apache.logging.log4j.core.config LoggerConfig])
-  (:import [org.apache.storm.generated LogConfig LogLevelAction])
-  (:gen-class))
-
-(defmulti mk-suicide-fn cluster-mode)
-
-(defn read-worker-executors [storm-conf storm-cluster-state storm-id 
assignment-id port assignment-versions]
-  (log-message "Reading Assignments.")
-  (let [assignment (:executor->node+port (.assignment-info storm-cluster-state 
storm-id nil))]
-    (doall
-     (concat
-      [Constants/SYSTEM_EXECUTOR_ID]
-      (mapcat (fn [[executor loc]]
-                (if (= loc [assignment-id port])
-                  [executor]
-                  ))
-              assignment)))))
-
-(defnk do-executor-heartbeats [worker :executors nil]
-  ;; stats is how we know what executors are assigned to this worker 
-  (let [stats (if-not executors
-                  (into {} (map (fn [e] {e nil}) (:executors worker)))
-                  (->> executors
-                    (map (fn [e] {(executor/get-executor-id e) 
(executor/render-stats e)}))
-                    (apply merge)))
-        zk-hb {:storm-id (:storm-id worker)
-               :executor-stats stats
-               :uptime ((:uptime worker))
-               :time-secs (current-time-secs)
-               }]
-    ;; do the zookeeper heartbeat
-    (try
-      (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) 
(:assignment-id worker) (:port worker) zk-hb)
-      (catch Exception exc
-        (log-error exc "Worker failed to write heartbeats to ZK or 
Pacemaker...will retry")))))
-
-(defn do-heartbeat [worker]
-  (let [conf (:conf worker)
-        state (worker-state conf (:worker-id worker))]
-    ;; do the local-file-system heartbeat.
-    (ls-worker-heartbeat! state (current-time-secs) (:storm-id worker) 
(:executors worker) (:port worker))
-    (.cleanup state 60) ; this is just in case supervisor is down so that disk 
doesn't fill up.
-                         ; it shouldn't take supervisor 120 seconds between 
listing dir and reading it
-
-    ))
-
-(defn worker-outbound-tasks
-  "Returns seq of task-ids that receive messages from this worker"
-  [worker]
-  (let [context (worker-context worker)
-        components (mapcat
-                     (fn [task-id]
-                       (->> (.getComponentId context (int task-id))
-                            (.getTargets context)
-                            vals
-                            (map keys)
-                            (apply concat)))
-                     (:task-ids worker))]
-    (-> worker
-        :task->component
-        reverse-map
-        (select-keys components)
-        vals
-        flatten
-        set )))
-
-(defn get-dest
-  [^AddressedTuple addressed-tuple]
-  "get the destination for an AddressedTuple"
-  (.getDest addressed-tuple))
-
-(defn mk-transfer-local-fn [worker]
-  (let [short-executor-receive-queue-map (:short-executor-receive-queue-map 
worker)
-        task->short-executor (:task->short-executor worker)
-        task-getter (comp #(get task->short-executor %) get-dest)]
-    (fn [tuple-batch]
-      (let [grouped (fast-group-by task-getter tuple-batch)]
-        (fast-map-iter [[short-executor pairs] grouped]
-          (let [q (short-executor-receive-queue-map short-executor)]
-            (if q
-              (disruptor/publish q pairs)
-              (log-warn "Received invalid messages for unknown tasks. 
Dropping... ")
-              )))))))
-
-(defn- assert-can-serialize [^KryoTupleSerializer serializer tuple-batch]
-  "Check that all of the tuples can be serialized by serializing them."
-  (fast-list-iter [[task tuple :as pair] tuple-batch]
-    (.serialize serializer tuple)))
-
-(defn- mk-backpressure-handler [executors]
-  "make a handler that checks and updates worker's backpressure flag"
-  (disruptor/worker-backpressure-handler
-    (fn [worker]
-      (let [storm-id (:storm-id worker)
-            assignment-id (:assignment-id worker)
-            port (:port worker)
-            storm-cluster-state (:storm-cluster-state worker)
-            prev-backpressure-flag @(:backpressure worker)
-            ;; the backpressure flag is true if at least one of the disruptor 
queues has throttle-on
-            curr-backpressure-flag (if executors
-                                     (or (.getThrottleOn (:transfer-queue 
worker))
-                                       (reduce #(or %1 %2) (map 
#(.get-backpressure-flag %1) executors)))
-                                     prev-backpressure-flag)]
-        ;; update the worker's backpressure flag to zookeeper only when it has 
changed
-        (when (not= prev-backpressure-flag curr-backpressure-flag)
-          (try
-            (log-debug "worker backpressure flag changing from " 
prev-backpressure-flag " to " curr-backpressure-flag)
-            (.worker-backpressure! storm-cluster-state storm-id assignment-id 
port curr-backpressure-flag)
-            ;; doing the local reset after the zk update succeeds is very 
important to avoid a bad state upon zk exception
-            (reset! (:backpressure worker) curr-backpressure-flag)
-            (catch Exception exc
-              (log-error exc "workerBackpressure update failed when connecting 
to ZK ... will retry"))))
-        ))))
-
-(defn- mk-disruptor-backpressure-handler [worker]
-  "make a handler for the worker's send disruptor queue to
-  check highWaterMark and lowWaterMark for backpressure"
-  (disruptor/disruptor-backpressure-handler
-    (fn []
-      (log-debug "worker " (:worker-id worker) " transfer-queue is congested, 
set backpressure flag true")
-      (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger worker)))
-    (fn []
-      (log-debug "worker " (:worker-id worker) " transfer-queue is not 
congested, set backpressure flag false")
-      (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger worker)))))
-
-(defn mk-transfer-fn [worker]
-  (let [local-tasks (-> worker :task-ids set)
-        local-transfer (:transfer-local-fn worker)
-        ^DisruptorQueue transfer-queue (:transfer-queue worker)
-        task->node+port (:cached-task->node+port worker)
-        try-serialize-local ((:storm-conf worker) 
TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE)
-
-        transfer-fn
-          (fn [^KryoTupleSerializer serializer tuple-batch]
-            (let [^ArrayList local (ArrayList.)
-                  ^HashMap remoteMap (HashMap.)]
-              (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
-                (let [task (.getDest addressed-tuple)
-                      tuple (.getTuple addressed-tuple)]
-                  (if (local-tasks task)
-                    (.add local addressed-tuple)
-
-                    ;;Using java objects directly to avoid performance issues 
in java code
-                    (do
-                      (when (not (.get remoteMap task))
-                        (.put remoteMap task (ArrayList.)))
-                      (let [^ArrayList remote (.get remoteMap task)]
-                        (if (not-nil? task)
-                          (.add remote (TaskMessage. task ^bytes (.serialize 
serializer tuple)))
-                          (log-warn "Can't transfer tuple - task value is nil. 
tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
-                       )))))
-
-              (when (not (.isEmpty local)) (local-transfer local))
-              (when (not (.isEmpty remoteMap)) (disruptor/publish 
transfer-queue remoteMap))))]
-    (if try-serialize-local
-      (do
-        (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " 
TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
-        (fn [^KryoTupleSerializer serializer tuple-batch]
-          (assert-can-serialize serializer tuple-batch)
-          (transfer-fn serializer tuple-batch)))
-      transfer-fn)))
-
-(defn- mk-receive-queue-map [storm-conf executors]
-  (->> executors
-       ;; TODO: this depends on the type of executor
-       (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
-                                                  (storm-conf 
TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
-                                                  (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                                  :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
-                                                  :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
-       (into {})
-       ))
-
-(defn- stream->fields [^StormTopology topology component]
-  (->> (ThriftTopologyUtils/getComponentCommon topology component)
-       .get_streams
-       (map (fn [[s info]] [s (Fields. (.get_output_fields info))]))
-       (into {})
-       (HashMap.)))
-
-(defn component->stream->fields [^StormTopology topology]
-  (->> (ThriftTopologyUtils/getComponentIds topology)
-       (map (fn [c] [c (stream->fields topology c)]))
-       (into {})
-       (HashMap.)))
-
-(defn- mk-default-resources [worker]
-  (let [conf (:conf worker)
-        thread-pool-size (int (conf TOPOLOGY-WORKER-SHARED-THREAD-POOL-SIZE))]
-    {WorkerTopologyContext/SHARED_EXECUTOR (Executors/newFixedThreadPool 
thread-pool-size)}
-    ))
-
-(defn- mk-user-resources [worker]
-  ;;TODO: need to invoke a hook provided by the topology, giving it a chance 
to create user resources.
-  ;; this would be part of the initialization hook
-  ;; need to separate workertopologycontext into WorkerContext and 
WorkerUserContext.
-  ;; actually just do it via interfaces. just need to make sure to hide 
setResource from tasks
-  {})
-
-(defn mk-halting-timer [timer-name]
-  (mk-timer :kill-fn (fn [t]
-                       (log-error t "Error when processing event")
-                       (exit-process! 20 "Error when processing an event")
-                       )
-            :timer-name timer-name))
-
-(defn worker-data [conf mq-context storm-id assignment-id port worker-id 
storm-conf cluster-state storm-cluster-state]
-  (let [assignment-versions (atom {})
-        executors (set (read-worker-executors storm-conf storm-cluster-state 
storm-id assignment-id port assignment-versions))
-        transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" 
(storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
-                                                  (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                                  :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
-                                                  :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
-        executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
-
-        receive-queue-map (->> executor-receive-queue-map
-                               (mapcat (fn [[e queue]] (for [t 
(executor-id->tasks e)] [t queue])))
-                               (into {}))
-
-        topology (read-supervisor-topology conf storm-id)
-        mq-context  (if mq-context
-                      mq-context
-                      (TransportFactory/makeContext storm-conf))]
-
-    (recursive-map
-      :conf conf
-      :mq-context mq-context
-      :receiver (.bind ^IContext mq-context storm-id port)
-      :storm-id storm-id
-      :assignment-id assignment-id
-      :port port
-      :worker-id worker-id
-      :cluster-state cluster-state
-      :storm-cluster-state storm-cluster-state
-      ;; when worker bootup, worker will start to setup initial connections to
-      ;; other workers. When all connection is ready, we will enable this flag
-      ;; and spout and bolt will be activated.
-      :worker-active-flag (atom false)
-      :storm-active-atom (atom false)
-      :storm-component->debug-atom (atom {})
-      :executors executors
-      :task-ids (->> receive-queue-map keys (map int) sort)
-      :storm-conf storm-conf
-      :topology topology
-      :system-topology (system-topology! storm-conf topology)
-      :heartbeat-timer (mk-halting-timer "heartbeat-timer")
-      :refresh-load-timer (mk-halting-timer "refresh-load-timer")
-      :refresh-connections-timer (mk-halting-timer "refresh-connections-timer")
-      :refresh-credentials-timer (mk-halting-timer "refresh-credentials-timer")
-      :reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
-      :refresh-active-timer (mk-halting-timer "refresh-active-timer")
-      :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
-      :refresh-backpressure-timer (mk-halting-timer 
"refresh-backpressure-timer")
-      :user-timer (mk-halting-timer "user-timer")
-      :task->component (HashMap. (storm-task-info topology storm-conf)) ; for 
optimized access when used in tasks later on
-      :component->stream->fields (component->stream->fields (:system-topology 
<>))
-      :component->sorted-tasks (->> (:task->component <>) reverse-map (map-val 
sort))
-      :endpoint-socket-lock (mk-rw-lock)
-      :cached-node+port->socket (atom {})
-      :cached-task->node+port (atom {})
-      :transfer-queue transfer-queue
-      :executor-receive-queue-map executor-receive-queue-map
-      :short-executor-receive-queue-map (map-key first 
executor-receive-queue-map)
-      :task->short-executor (->> executors
-                                 (mapcat (fn [e] (for [t (executor-id->tasks 
e)] [t (first e)])))
-                                 (into {})
-                                 (HashMap.))
-      :suicide-fn (mk-suicide-fn conf)
-      :uptime (uptime-computer)
-      :default-shared-resources (mk-default-resources <>)
-      :user-shared-resources (mk-user-resources <>)
-      :transfer-local-fn (mk-transfer-local-fn <>)
-      :transfer-fn (mk-transfer-fn <>)
-      :load-mapping (LoadMapping.)
-      :assignment-versions assignment-versions
-      :backpressure (atom false) ;; whether this worker is going slow
-      :backpressure-trigger (Object.) ;; a trigger for synchronization with 
executors
-      :throttle-on (atom false) ;; whether throttle is activated for spouts
-      )))
-
-(defn- endpoint->string [[node port]]
-  (str port "/" node))
-
-(defn string->endpoint [^String s]
-  (let [[port-str node] (.split s "/" 2)]
-    [node (Integer/valueOf port-str)]
-    ))
-
-(def LOAD-REFRESH-INTERVAL-MS 5000)
-
-(defn mk-refresh-load [worker]
-  (let [local-tasks (set (:task-ids worker))
-        remote-tasks (set/difference (worker-outbound-tasks worker) 
local-tasks)
-        short-executor-receive-queue-map (:short-executor-receive-queue-map 
worker)
-        next-update (atom 0)]
-    (fn this
-      ([]
-        (let [^LoadMapping load-mapping (:load-mapping worker)
-              local-pop (map-val (fn [queue]
-                                   (let [q-metrics (.getMetrics queue)]
-                                     (/ (double (.population q-metrics)) 
(.capacity q-metrics))))
-                                 short-executor-receive-queue-map)
-              remote-load (reduce merge (for [[np conn] 
@(:cached-node+port->socket worker)] (into {} (.getLoad conn remote-tasks))))
-              now (System/currentTimeMillis)]
-          (.setLocal load-mapping local-pop)
-          (.setRemote load-mapping remote-load)
-          (when (> now @next-update)
-            (.sendLoadMetrics (:receiver worker) local-pop)
-            (reset! next-update (+ LOAD-REFRESH-INTERVAL-MS now))))))))
-
-(defn mk-refresh-connections [worker]
-  (let [outbound-tasks (worker-outbound-tasks worker)
-        conf (:conf worker)
-        storm-cluster-state (:storm-cluster-state worker)
-        storm-id (:storm-id worker)]
-    (fn this
-      ([]
-        (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 
this))))
-      ([callback]
-         (let [version (.assignment-version storm-cluster-state storm-id 
callback)
-               assignment (if (= version (:version (get @(:assignment-versions 
worker) storm-id)))
-                            (:data (get @(:assignment-versions worker) 
storm-id))
-                            (let [new-assignment 
(.assignment-info-with-version storm-cluster-state storm-id callback)]
-                              (swap! (:assignment-versions worker) assoc 
storm-id new-assignment)
-                              (:data new-assignment)))
-              my-assignment (-> assignment
-                                :executor->node+port
-                                to-task->node+port
-                                (select-keys outbound-tasks)
-                                (#(map-val endpoint->string %)))
-              ;; we dont need a connection for the local tasks anymore
-              needed-assignment (->> my-assignment
-                                      (filter-key (complement (-> worker 
:task-ids set))))
-              needed-connections (-> needed-assignment vals set)
-              needed-tasks (-> needed-assignment keys)
-
-              current-connections (set (keys @(:cached-node+port->socket 
worker)))
-              new-connections (set/difference needed-connections 
current-connections)
-              remove-connections (set/difference current-connections 
needed-connections)]
-              (swap! (:cached-node+port->socket worker)
-                     #(HashMap. (merge (into {} %1) %2))
-                     (into {}
-                       (dofor [endpoint-str new-connections
-                               :let [[node port] (string->endpoint 
endpoint-str)]]
-                         [endpoint-str
-                          (.connect
-                           ^IContext (:mq-context worker)
-                           storm-id
-                           ((:node->host assignment) node)
-                           port)
-                          ]
-                         )))
-              (write-locked (:endpoint-socket-lock worker)
-                (reset! (:cached-task->node+port worker)
-                        (HashMap. my-assignment)))
-              (doseq [endpoint remove-connections]
-                (.close (get @(:cached-node+port->socket worker) endpoint)))
-              (apply swap!
-                     (:cached-node+port->socket worker)
-                     #(HashMap. (apply dissoc (into {} %1) %&))
-                     remove-connections)
-
-           )))))
-
-(defn refresh-storm-active
-  ([worker]
-    (refresh-storm-active worker (fn [& ignored] (schedule 
(:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
-  ([worker callback]
-    (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) 
callback)]
-      (reset!
-        (:storm-active-atom worker)
-        (and (= :active (-> base :status :type)) @(:worker-active-flag 
worker)))
-      (reset! (:storm-component->debug-atom worker) (-> base 
:component->debug))
-      (log-debug "Event debug options " @(:storm-component->debug-atom 
worker)))))
-
-;; TODO: consider having a max batch size besides what disruptor does 
automagically to prevent latency issues
-(defn mk-transfer-tuples-handler [worker]
-  (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
-        drainer (TransferDrainer.)
-        node+port->socket (:cached-node+port->socket worker)
-        task->node+port (:cached-task->node+port worker)
-        endpoint-socket-lock (:endpoint-socket-lock worker)
-        ]
-    (disruptor/clojure-handler
-      (fn [packets _ batch-end?]
-        (.add drainer packets)
-
-        (when batch-end?
-          (read-locked endpoint-socket-lock
-             (let [node+port->socket @node+port->socket
-                   task->node+port @task->node+port]
-               (.send drainer task->node+port node+port->socket)))
-          (.clear drainer))))))
-
-;; Check whether this messaging connection is ready to send data
-(defn is-connection-ready [^IConnection connection]
-  (if (instance?  ConnectionWithStatus connection)
-    (let [^ConnectionWithStatus connection connection
-          status (.status connection)]
-      (= status ConnectionWithStatus$Status/Ready))
-    true))
-
-;; all connections are ready
-(defn all-connections-ready [worker]
-    (let [connections (vals @(:cached-node+port->socket worker))]
-      (every? is-connection-ready connections)))
-
-;; we will wait all connections to be ready and then activate the spout/bolt
-;; when the worker bootup
-(defn activate-worker-when-all-connections-ready
-  [worker]
-  (let [timer (:refresh-active-timer worker)
-        delay-secs 0
-        recur-secs 1]
-    (schedule timer
-      delay-secs
-      (fn this []
-        (if (all-connections-ready worker)
-          (do
-            (log-message "All connections are ready for worker " 
(:assignment-id worker) ":" (:port worker)
-              " with id "(:worker-id worker))
-            (reset! (:worker-active-flag worker) true))
-          (schedule timer recur-secs this :check-active false)
-            )))))
-
-(defn register-callbacks [worker]
-  (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) 
":" (:port worker))
-  (msg-loader/register-callback (:transfer-local-fn worker)
-                                (:receiver worker)
-                                (:storm-conf worker)
-                                (worker-context worker)))
-
-(defn- close-resources [worker]
-  (let [dr (:default-shared-resources worker)]
-    (log-message "Shutting down default resources")
-    (.shutdownNow (get dr WorkerTopologyContext/SHARED_EXECUTOR))
-    (log-message "Shut down default resources")))
-
-(defn- get-logger-levels []
-  (into {}
-    (let [logger-config (.getConfiguration (LogManager/getContext false))]
-      (for [[logger-name logger] (.getLoggers logger-config)]
-        {logger-name (.getLevel logger)}))))
-
-(defn set-logger-level [logger-context logger-name new-level]
-  (let [config (.getConfiguration logger-context)
-        logger-config (.getLoggerConfig config logger-name)]
-    (if (not (= (.getName logger-config) logger-name))
-      ;; create a new config. Make it additive (true) s.t. inherit
-      ;; parents appenders
-      (let [new-logger-config (LoggerConfig. logger-name new-level true)]
-        (log-message "Adding config for: " new-logger-config " with level: " 
new-level)
-        (.addLogger config logger-name new-logger-config))
-      (do
-        (log-message "Setting " logger-config " log level to: " new-level)
-        (.setLevel logger-config new-level)))))
-
-;; function called on timer to reset log levels last set to DEBUG
-;; also called from process-log-config-change
-(defn reset-log-levels [latest-log-config-atom]
-  (let [latest-log-config @latest-log-config-atom
-        logger-context (LogManager/getContext false)]
-    (doseq [[logger-name logger-setting] (sort latest-log-config)]
-      (let [timeout (:timeout logger-setting)
-            target-log-level (:target-log-level logger-setting)
-            reset-log-level (:reset-log-level logger-setting)]
-        (when (> (coerce/to-long (time/now)) timeout)
-          (log-message logger-name ": Resetting level to " reset-log-level) 
-          (set-logger-level logger-context logger-name reset-log-level)
-          (swap! latest-log-config-atom
-            (fn [prev]
-              (dissoc prev logger-name))))))
-    (.updateLoggers logger-context)))
-
-;; when a new log level is received from zookeeper, this function is called
-(defn process-log-config-change [latest-log-config original-log-levels 
log-config]
-  (when log-config
-    (log-debug "Processing received log config: " log-config)
-    ;; merge log configs together
-    (let [loggers (.get_named_logger_level log-config)
-          logger-context (LogManager/getContext false)]
-      (def new-log-configs
-        (into {}
-         ;; merge named log levels
-         (for [[msg-logger-name logger-level] loggers]
-           (let [logger-name (if (= msg-logger-name "ROOT")
-                                LogManager/ROOT_LOGGER_NAME
-                                msg-logger-name)]
-             ;; the new-timeouts map now contains logger => timeout 
-             (when (.is_set_reset_log_level_timeout_epoch logger-level)
-               {logger-name {:action (.get_action logger-level)
-                             :target-log-level (Level/toLevel 
(.get_target_log_level logger-level))
-                             :reset-log-level (or (.get @original-log-levels 
logger-name) (Level/INFO))
-                             :timeout (.get_reset_log_level_timeout_epoch 
logger-level)}})))))
-
-      ;; look for deleted log timeouts
-      (doseq [[logger-name logger-val] (sort @latest-log-config)]
-        (when (not (contains? new-log-configs logger-name))
-          ;; if we had a timeout, but the timeout is no longer active
-          (set-logger-level
-            logger-context logger-name (:reset-log-level logger-val))))
-
-      ;; apply new log settings we just received
-      ;; the merged configs are only for the reset logic
-      (doseq [[msg-logger-name logger-level] (sort (into {} 
(.get_named_logger_level log-config)))]
-        (let [logger-name (if (= msg-logger-name "ROOT")
-                                LogManager/ROOT_LOGGER_NAME
-                                msg-logger-name)
-              level (Level/toLevel (.get_target_log_level logger-level))
-              action (.get_action logger-level)]
-          (if (= action LogLevelAction/UPDATE)
-            (set-logger-level logger-context logger-name level))))
-   
-      (.updateLoggers logger-context)
-      (reset! latest-log-config new-log-configs)
-      (log-debug "New merged log config is " @latest-log-config))))
-
-(defn run-worker-start-hooks [worker]
-  (let [topology (:topology worker)
-        topo-conf (:storm-conf worker)
-        worker-topology-context (worker-context worker)
-        hooks (.get_worker_hooks topology)]
-    (dofor [hook hooks]
-      (let [hook-bytes (Utils/toByteArray hook)
-            deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
-        (.start deser-hook topo-conf worker-topology-context)))))
-
-(defn run-worker-shutdown-hooks [worker]
-  (let [topology (:topology worker)
-        hooks (.get_worker_hooks topology)]
-    (dofor [hook hooks]
-      (let [hook-bytes (Utils/toByteArray hook)
-            deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
-        (.shutdown deser-hook)))))
-
-;; TODO: should worker even take the storm-id as input? this should be
-;; deducable from cluster state (by searching through assignments)
-;; what about if there's inconsistency in assignments? -> but nimbus
-;; should guarantee this consistency
-(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port 
worker-id]
-  (log-message "Launching worker for " storm-id " on " assignment-id ":" port 
" with id " worker-id
-               " and conf " conf)
-  (if-not (local-mode? conf)
-    (redirect-stdio-to-slf4j!))
-  ;; because in local mode, its not a separate
-  ;; process. supervisor will register it in this case
-  (when (= :distributed (cluster-mode conf))
-    (let [pid (process-pid)]
-      (touch (worker-pid-path conf worker-id pid))
-      (spit (worker-artifacts-pid-path conf storm-id port) pid)))
-
-  (declare establish-log-setting-callback)
-
-  ;; start out with empty list of timeouts 
-  (def latest-log-config (atom {}))
-  (def original-log-levels (atom {}))
-
-  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
-        storm-conf (override-login-config-with-system-property storm-conf)
-        acls (Utils/getWorkerACL storm-conf)
-        cluster-state (cluster/mk-distributed-cluster-state conf :auth-conf 
storm-conf :acls acls :context (ClusterStateContext. DaemonType/WORKER))
-        storm-cluster-state (cluster/mk-storm-cluster-state cluster-state 
:acls acls)
-        initial-credentials (.credentials storm-cluster-state storm-id nil)
-        auto-creds (AuthUtils/GetAutoCredentials storm-conf)
-        subject (AuthUtils/populateSubject nil auto-creds initial-credentials)]
-      (Subject/doAs subject (reify PrivilegedExceptionAction
-        (run [this]
-          (let [worker (worker-data conf shared-mq-context storm-id 
assignment-id port worker-id storm-conf cluster-state storm-cluster-state)
-        heartbeat-fn #(do-heartbeat worker)
-
-        ;; do this here so that the worker process dies if this fails
-        ;; it's important that worker heartbeat to supervisor ASAP when 
launching so that the supervisor knows it's running (and can move on)
-        _ (heartbeat-fn)
-
-        executors (atom nil)
-        ;; launch heartbeat threads immediately so that slow-loading tasks 
don't cause the worker to timeout
-        ;; to the supervisor
-        _ (schedule-recurring (:heartbeat-timer worker) 0 (conf 
WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
-        _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf 
TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors 
@executors))
-
-        _ (register-callbacks worker)
-
-        refresh-connections (mk-refresh-connections worker)
-        refresh-load (mk-refresh-load worker)
-
-        _ (refresh-connections nil)
-
-        _ (activate-worker-when-all-connections-ready worker)
-
-        _ (refresh-storm-active worker nil)
-
-        _ (run-worker-start-hooks worker)
-
-        _ (reset! executors (dofor [e (:executors worker)] 
(executor/mk-executor worker e initial-credentials)))
-
-        transfer-tuples (mk-transfer-tuples-handler worker)
-        
-        transfer-thread (disruptor/consume-loop* (:transfer-queue worker) 
transfer-tuples)               
-
-        disruptor-handler (mk-disruptor-backpressure-handler worker)
-        _ (.registerBackpressureCallback (:transfer-queue worker) 
disruptor-handler)
-        _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf 
worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
-              (.setLowWaterMark ((:storm-conf worker) 
BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
-              (.setEnableBackpressure ((:storm-conf worker) 
TOPOLOGY-BACKPRESSURE-ENABLE)))
-        backpressure-handler (mk-backpressure-handler @executors)        
-        backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger 
worker) worker backpressure-handler)
-        _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) 
-            (.start backpressure-thread))
-        topology-backpressure-callback (fn cb [& ignored]
-                   (let [throttle-on (.topology-backpressure 
storm-cluster-state storm-id cb)]
-                     (reset! (:throttle-on worker) throttle-on)))
-        _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-            (.topology-backpressure storm-cluster-state storm-id 
topology-backpressure-callback))
-
-        shutdown* (fn []
-                    (log-message "Shutting down worker " storm-id " " 
assignment-id " " port)
-                    (doseq [[_ socket] @(:cached-node+port->socket worker)]
-                      ;; this will do best effort flushing since the linger 
period
-                      ;; was set on creation
-                      (.close socket))
-                    (log-message "Terminating messaging context")
-                    (log-message "Shutting down executors")
-                    (doseq [executor @executors] (.shutdown executor))
-                    (log-message "Shut down executors")
-
-                    ;;this is fine because the only time this is shared is 
when it's a local context,
-                    ;;in which case it's a noop
-                    (.term ^IContext (:mq-context worker))
-                    (log-message "Shutting down transfer thread")
-                    (disruptor/halt-with-interrupt! (:transfer-queue worker))
-
-                    (.interrupt transfer-thread)
-                    (.join transfer-thread)
-                    (log-message "Shut down transfer thread")
-                    (.terminate backpressure-thread)
-                    (log-message "Shut down backpressure thread")
-                    (cancel-timer (:heartbeat-timer worker))
-                    (cancel-timer (:refresh-connections-timer worker))
-                    (cancel-timer (:refresh-credentials-timer worker))
-                    (cancel-timer (:refresh-active-timer worker))
-                    (cancel-timer (:executor-heartbeat-timer worker))
-                    (cancel-timer (:user-timer worker))
-                    (cancel-timer (:refresh-load-timer worker))
-
-                    (close-resources worker)
-
-                    (log-message "Trigger any worker shutdown hooks")
-                    (run-worker-shutdown-hooks worker)
-
-                    (.remove-worker-heartbeat! (:storm-cluster-state worker) 
storm-id assignment-id port)
-                    (.remove-worker-backpressure! (:storm-cluster-state 
worker) storm-id assignment-id port)
-
-                    (log-message "Disconnecting from storm cluster state 
context")
-                    (.disconnect (:storm-cluster-state worker))
-                    (.close (:cluster-state worker))
-                    (log-message "Shut down worker " storm-id " " 
assignment-id " " port))
-        ret (reify
-             Shutdownable
-             (shutdown
-              [this]
-              (shutdown*))
-             DaemonCommon
-             (waiting? [this]
-               (and
-                 (timer-waiting? (:heartbeat-timer worker))
-                 (timer-waiting? (:refresh-connections-timer worker))
-                 (timer-waiting? (:refresh-load-timer worker))
-                 (timer-waiting? (:refresh-credentials-timer worker))
-                 (timer-waiting? (:refresh-active-timer worker))
-                 (timer-waiting? (:executor-heartbeat-timer worker))
-                 (timer-waiting? (:user-timer worker))
-                 ))
-             )
-        credentials (atom initial-credentials)
-        check-credentials-changed (fn []
-                                    (let [new-creds (.credentials 
(:storm-cluster-state worker) storm-id nil)]
-                                      (when-not (= new-creds @credentials) 
;;This does not have to be atomic, worst case we update when one is not needed
-                                        (AuthUtils/updateSubject subject 
auto-creds new-creds)
-                                        (dofor [e @executors] 
(.credentials-changed e new-creds))
-                                        (reset! credentials new-creds))))
-        check-log-config-changed (fn []
-                                  (let [log-config (.topology-log-config 
(:storm-cluster-state worker) storm-id nil)]
-                                    (process-log-config-change 
latest-log-config original-log-levels log-config)
-                                    (establish-log-setting-callback)))]
-    (reset! original-log-levels (get-logger-levels))
-    (log-message "Started with log levels: " @original-log-levels)
-  
-    (defn establish-log-setting-callback []
-      (.topology-log-config (:storm-cluster-state worker) storm-id (fn [args] 
(check-log-config-changed))))
-
-    (establish-log-setting-callback)
-    (.credentials (:storm-cluster-state worker) storm-id (fn [args] 
(check-credentials-changed)))
-    (schedule-recurring (:refresh-credentials-timer worker) 0 (conf 
TASK-CREDENTIALS-POLL-SECS)
-                        (fn [& args]
-                          (check-credentials-changed)))
-
-    (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-      (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf 
TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
-
-    ;; The jitter allows the clients to get the data at different times, and 
avoids thundering herd
-    (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
-      (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 
refresh-load))
-    (schedule-recurring (:refresh-connections-timer worker) 0 (conf 
TASK-REFRESH-POLL-SECS) refresh-connections)
-    (schedule-recurring (:reset-log-levels-timer worker) 0 (conf 
WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
-    (schedule-recurring (:refresh-active-timer worker) 0 (conf 
TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
-
-    (log-message "Worker has topology config " (redact-value (:storm-conf 
worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
-    (log-message "Worker " worker-id " for storm " storm-id " on " 
assignment-id ":" port " has finished loading")
-    ret
-    ))))))
-
-(defmethod mk-suicide-fn
-  :local [conf]
-  (fn [] (exit-process! 1 "Worker died")))
-
-(defmethod mk-suicide-fn
-  :distributed [conf]
-  (fn [] (exit-process! 1 "Worker died")))
-
-(defn -main [storm-id assignment-id port-str worker-id]
-  (let [conf (read-storm-config)]
-    (setup-default-uncaught-exception-handler)
-    (validate-distributed-mode! conf)
-    (let [worker (mk-worker conf nil storm-id assignment-id (Integer/parseInt 
port-str) worker-id)]
-      (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown worker)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj 
b/storm-core/src/clj/backtype/storm/disruptor.clj
deleted file mode 100644
index 1546b3f..0000000
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ /dev/null
@@ -1,89 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.disruptor
-  (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback 
DisruptorBackpressureCallback])
-  (:import [com.lmax.disruptor.dsl ProducerType])
-  (:require [clojure [string :as str]])
-  (:require [clojure [set :as set]])
-  (:use [clojure walk])
-  (:use [org.apache.storm util log]))
-
-(def PRODUCER-TYPE
-  {:multi-threaded ProducerType/MULTI
-   :single-threaded ProducerType/SINGLE})
-
-(defnk disruptor-queue
-  [^String queue-name buffer-size timeout :producer-type :multi-threaded 
:batch-size 100 :batch-timeout 1]
-  (DisruptorQueue. queue-name
-                   (PRODUCER-TYPE producer-type) buffer-size
-                   timeout batch-size batch-timeout))
-
-(defn clojure-handler
-  [afn]
-  (reify com.lmax.disruptor.EventHandler
-    (onEvent
-      [this o seq-id batchEnd?]
-      (afn o seq-id batchEnd?))))
-
-(defn disruptor-backpressure-handler
-  [afn-high-wm afn-low-wm]
-  (reify DisruptorBackpressureCallback
-    (highWaterMark
-      [this]
-      (afn-high-wm))
-    (lowWaterMark
-      [this]
-      (afn-low-wm))))
-
-(defn worker-backpressure-handler
-  [afn]
-  (reify WorkerBackpressureCallback
-    (onEvent
-      [this o]
-      (afn o))))
-
-(defmacro handler
-  [& args]
-  `(clojure-handler (fn ~@args)))
-
-(defn publish
-  [^DisruptorQueue q o]
-  (.publish q o))
-
-(defn consume-batch
-  [^DisruptorQueue queue handler]
-  (.consumeBatch queue handler))
-
-(defn consume-batch-when-available
-  [^DisruptorQueue queue handler]
-  (.consumeBatchWhenAvailable queue handler))
-
-(defn halt-with-interrupt!
-  [^DisruptorQueue queue]
-  (.haltWithInterrupt queue))
-
-(defnk consume-loop*
-  [^DisruptorQueue queue handler
-   :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
-  (async-loop
-          (fn [] (consume-batch-when-available queue handler) 0)
-          :kill-fn kill-fn
-          :thread-name (.getName queue)))
-
-(defmacro consume-loop [queue & handler-args]
-  `(let [handler# (handler ~@handler-args)]
-     (consume-loop* ~queue handler#)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/event.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/event.clj 
b/storm-core/src/clj/backtype/storm/event.clj
deleted file mode 100644
index edc7616..0000000
--- a/storm-core/src/clj/backtype/storm/event.clj
+++ /dev/null
@@ -1,71 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.event
-  (:use [org.apache.storm log util])
-  (:import [org.apache.storm.utils Time Utils])
-  (:import [java.io InterruptedIOException])
-  (:import [java.util.concurrent LinkedBlockingQueue TimeUnit]))
-
-(defprotocol EventManager
-  (add [this event-fn])
-  (waiting? [this])
-  (shutdown [this]))
-
-(defn event-manager
-  "Creates a thread to respond to events. Any error will cause process to halt"
-  [daemon?]
-  (let [added (atom 0)
-        processed (atom 0)
-        ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
-        running (atom true)
-        runner (Thread.
-                 (fn []
-                   (try-cause
-                     (while @running
-                       (let [r (.take queue)]
-                         (r)
-                         (swap! processed inc)))
-                     (catch InterruptedIOException t
-                       (log-message "Event manager interrupted while doing 
IO"))
-                     (catch InterruptedException t
-                       (log-message "Event manager interrupted"))
-                     (catch Throwable t
-                       (log-error t "Error when processing event")
-                       (exit-process! 20 "Error when processing an event")))))]
-    (.setDaemon runner daemon?)
-    (.start runner)
-    (reify
-      EventManager
-
-      (add
-        [this event-fn]
-        ;; should keep track of total added and processed to know if this is 
finished yet
-        (when-not @running
-          (throw (RuntimeException. "Cannot add events to a shutdown event 
manager")))
-        (swap! added inc)
-        (.put queue event-fn))
-
-      (waiting?
-        [this]
-        (or (Time/isThreadWaiting runner)
-            (= @processed @added)))
-
-      (shutdown
-        [this]
-        (reset! running false)
-        (.interrupt runner)
-        (.join runner)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/local_state.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/local_state.clj 
b/storm-core/src/clj/backtype/storm/local_state.clj
deleted file mode 100644
index a95a85b..0000000
--- a/storm-core/src/clj/backtype/storm/local_state.clj
+++ /dev/null
@@ -1,131 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.local-state
-  (:use [org.apache.storm log util])
-  (:import [org.apache.storm.generated StormTopology
-            InvalidTopologyException GlobalStreamId
-            LSSupervisorId LSApprovedWorkers
-            LSSupervisorAssignments LocalAssignment
-            ExecutorInfo LSWorkerHeartbeat
-            LSTopoHistory LSTopoHistoryList
-            WorkerResources])
-  (:import [org.apache.storm.utils LocalState]))
-
-(def LS-WORKER-HEARTBEAT "worker-heartbeat")
-(def LS-ID "supervisor-id")
-(def LS-LOCAL-ASSIGNMENTS "local-assignments")
-(def LS-APPROVED-WORKERS "approved-workers")
-(def LS-TOPO-HISTORY "topo-hist")
-
-(defn ->LSTopoHistory
-  [{topoid :topoid timestamp :timestamp users :users groups :groups}]
-  (LSTopoHistory. topoid timestamp users groups))
-
-(defn ->topo-history
-  [thrift-topo-hist]
-  {
-    :topoid (.get_topology_id thrift-topo-hist)
-    :timestamp (.get_time_stamp thrift-topo-hist)
-    :users (.get_users thrift-topo-hist)
-    :groups (.get_groups thrift-topo-hist)})
-
-(defn ls-topo-hist!
-  [^LocalState local-state hist-list]
-  (.put local-state LS-TOPO-HISTORY
-    (LSTopoHistoryList. (map ->LSTopoHistory hist-list))))
-
-(defn ls-topo-hist
-  [^LocalState local-state]
-  (if-let [thrift-hist-list (.get local-state LS-TOPO-HISTORY)]
-    (map ->topo-history (.get_topo_history thrift-hist-list))))
-
-(defn ls-supervisor-id!
-  [^LocalState local-state ^String id]
-    (.put local-state LS-ID (LSSupervisorId. id)))
-
-(defn ls-supervisor-id
-  [^LocalState local-state]
-  (if-let [super-id (.get local-state LS-ID)]
-    (.get_supervisor_id super-id)))
-
-(defn ls-approved-workers!
-  [^LocalState local-state workers]
-    (.put local-state LS-APPROVED-WORKERS (LSApprovedWorkers. workers)))
-
-(defn ls-approved-workers
-  [^LocalState local-state]
-  (if-let [tmp (.get local-state LS-APPROVED-WORKERS)]
-    (into {} (.get_approved_workers tmp))))
-
-(defn ->ExecutorInfo
-  [[low high]] (ExecutorInfo. low high))
-
-(defn ->ExecutorInfo-list
-  [executors]
-  (map ->ExecutorInfo executors))
-
-(defn ->executor-list
-  [executors]
-  (into [] 
-    (for [exec-info executors] 
-      [(.get_task_start exec-info) (.get_task_end exec-info)])))
-
-(defn ->LocalAssignment
-  [{storm-id :storm-id executors :executors resources :resources}]
-  (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))]
-    (if resources (.set_resources assignment
-                                  (doto (WorkerResources. )
-                                    (.set_mem_on_heap (first resources))
-                                    (.set_mem_off_heap (second resources))
-                                    (.set_cpu (last resources)))))
-    assignment))
-
-(defn mk-local-assignment
-  [storm-id executors resources]
-  {:storm-id storm-id :executors executors :resources resources})
-
-(defn ->local-assignment
-  [^LocalAssignment thrift-local-assignment]
-    (mk-local-assignment
-      (.get_topology_id thrift-local-assignment)
-      (->executor-list (.get_executors thrift-local-assignment))
-      (.get_resources thrift-local-assignment)))
-
-(defn ls-local-assignments!
-  [^LocalState local-state assignments]
-    (let [local-assignment-map (map-val ->LocalAssignment assignments)]
-    (.put local-state LS-LOCAL-ASSIGNMENTS 
-          (LSSupervisorAssignments. local-assignment-map))))
-
-(defn ls-local-assignments
-  [^LocalState local-state]
-    (if-let [thrift-local-assignments (.get local-state LS-LOCAL-ASSIGNMENTS)]
-      (map-val
-        ->local-assignment
-        (.get_assignments thrift-local-assignments))))
-
-(defn ls-worker-heartbeat!
-  [^LocalState local-state time-secs storm-id executors port]
-  (.put local-state LS-WORKER-HEARTBEAT (LSWorkerHeartbeat. time-secs storm-id 
(->ExecutorInfo-list executors) port) false))
-
-(defn ls-worker-heartbeat 
-  [^LocalState local-state]
-  (if-let [worker-hb (.get local-state LS-WORKER-HEARTBEAT)]
-    {:time-secs (.get_time_secs worker-hb)
-     :storm-id (.get_topology_id worker-hb)
-     :executors (->executor-list (.get_executors worker-hb))
-     :port (.get_port worker-hb)}))
-

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/log.clj 
b/storm-core/src/clj/backtype/storm/log.clj
deleted file mode 100644
index 96570e3..0000000
--- a/storm-core/src/clj/backtype/storm/log.clj
+++ /dev/null
@@ -1,56 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.log
-  (:require [clojure.tools.logging :as log])
-  (:use [clojure pprint])
-  (:import [java.io StringWriter]))
-
-(defmacro log-message
-  [& args]
-  `(log/info (str ~@args)))
-
-(defmacro log-error
-  [e & args]
-  `(log/log :error ~e (str ~@args)))
-
-(defmacro log-debug
-  [& args]
-  `(log/debug (str ~@args)))
-
-(defmacro log-warn-error
-  [e & args]
-  `(log/warn (str ~@args) ~e))
-
-(defmacro log-warn
-  [& args]
-  `(log/warn (str ~@args)))
-
-(defn log-capture!
-  [& args]
-  (apply log/log-capture! args))
-
-(defn log-stream
-  [& args]
-  (apply log/log-stream args))
-
-(defmacro log-pprint
-  [& args]
-  `(let [^StringWriter writer# (StringWriter.)]
-     (doall
-       (for [object# [~@args]]
-         (pprint object# writer#)))
-     (log-message "\n" writer#)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj 
b/storm-core/src/clj/backtype/storm/messaging/loader.clj
deleted file mode 100644
index b190ab0..0000000
--- a/storm-core/src/clj/backtype/storm/messaging/loader.clj
+++ /dev/null
@@ -1,34 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.messaging.loader
-  (:import [org.apache.storm.messaging IConnection 
DeserializingConnectionCallback])
-  (:require [org.apache.storm.messaging [local :as local]]))
-
-(defn mk-local-context []
-  (local/mk-context))
-
-(defn- mk-connection-callback
-  "make an IConnectionCallback"
-  [transfer-local-fn storm-conf worker-context]
-  (DeserializingConnectionCallback. storm-conf
-                                    worker-context
-                                    (fn [batch]
-                                      (transfer-local-fn batch))))
-
-(defn register-callback
-  "register the local-transfer-fn with the server"
-  [transfer-local-fn ^IConnection socket storm-conf worker-context]
-  (.registerRecv socket (mk-connection-callback transfer-local-fn storm-conf 
worker-context)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj 
b/storm-core/src/clj/backtype/storm/messaging/local.clj
deleted file mode 100644
index 32fbb34..0000000
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ /dev/null
@@ -1,23 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.messaging.local
-  (:import [org.apache.storm.messaging IContext])
-  (:import [org.apache.storm.messaging.local Context]))
-
-(defn mk-context [] 
-  (let [context  (Context.)]
-    (.prepare ^IContext context nil)
-    context))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/metric/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/metric/testing.clj 
b/storm-core/src/clj/backtype/storm/metric/testing.clj
deleted file mode 100644
index a8ec438..0000000
--- a/storm-core/src/clj/backtype/storm/metric/testing.clj
+++ /dev/null
@@ -1,68 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.metric.testing
-  "This namespace is for AOT dependent metrics testing code."
-  (:gen-class))
-
-(letfn [(for- [threader arg seq-exprs body]
-          `(reduce #(%2 %1)
-                   ~arg
-                   (for ~seq-exprs
-                     (fn [arg#] (~threader arg# ~@body)))))]
-  (defmacro for->
-    "Apply a thread expression to a sequence.
-   eg.
-      (-> 1
-        (for-> [x [1 2 3]]
-          (+ x)))
-   => 7"
-    {:indent 1}
-    [arg seq-exprs & body]
-    (for- 'clojure.core/-> arg seq-exprs body)))
-
-(gen-class
- :name clojure.storm.metric.testing.FakeMetricConsumer
- :implements [org.apache.storm.metric.api.IMetricsConsumer]
- :prefix "impl-")
-
-(def buffer (atom nil))
-
-(defn impl-prepare [this conf argument ctx error-reporter]
-  (reset! buffer {}))
-
-(defn impl-cleanup [this]
-  (reset! buffer {}))
-
-(defn vec-conj [coll x] (if coll
-                          (conj coll x)
-                          [x]))
-
-(defn expand-complex-datapoint [dp]
-  (if (or (map? (.value dp))
-          (instance? java.util.AbstractMap (.value dp)))
-    (into [] (for [[k v] (.value dp)]
-               [(str (.name dp) "/" k) v]))
-    [[(.name dp) (.value dp)]]))
-
-(defn impl-handleDataPoints [this task-info data-points]  
-  (swap! buffer
-         (fn [old]
-           (-> old
-            (for-> [dp data-points
-                    [name val] (expand-complex-datapoint dp)]
-                   (update-in [(.srcComponentId task-info) name (.srcTaskId 
task-info)] vec-conj val))))))
- 
-

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/pacemaker/pacemaker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/pacemaker/pacemaker.clj 
b/storm-core/src/clj/backtype/storm/pacemaker/pacemaker.clj
deleted file mode 100644
index 70313e4..0000000
--- a/storm-core/src/clj/backtype/storm/pacemaker/pacemaker.clj
+++ /dev/null
@@ -1,241 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.pacemaker.pacemaker
-  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
-           [java.util.concurrent ConcurrentHashMap]
-           [java.util.concurrent.atomic AtomicInteger]
-           [org.apache.storm.generated HBNodes
-                                     HBServerMessageType HBMessage 
HBMessageData HBPulse]
-           [org.apache.storm.utils VersionInfo])
-  (:use [clojure.string :only [replace-first split]]
-        [org.apache.storm log config util])
-  (:require [clojure.java.jmx :as jmx])
-  (:gen-class))
-
-(def STORM-VERSION (VersionInfo/getVersion))
-
-;; Stats Functions
-
-(def sleep-seconds 60)
-
-
-(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
-                                            :or {compare (fn [new old] true)
-                                                 new-fn (fn [new old] new)}}]
-  (loop []
-    (let [old (.get (key stats))
-          new (new-fn new old)]
-      (if (compare new old)
-        (if (.compareAndSet (key stats) old new)
-          nil
-          (recur))
-        nil))))
-
-(defn- set-average [stats size]
-  (check-and-set-loop
-   stats
-   :average-heartbeat-size
-   size
-   :new-fn (fn [new old]
-            (let [count (.get (:send-pulse-count stats))]
-                                        ; Weighted average
-              (/ (+ new (* count old)) (+ count 1))))))
-
-(defn- set-largest [stats size]
-  (check-and-set-loop
-   stats
-   :largest-heartbeat-size
-   size
-   :compare #'>))
-
-(defn- report-stats [heartbeats stats last-five-s]
-  (loop []
-    (let [send-count (.getAndSet (:send-pulse-count stats) 0)
-          received-size (.getAndSet (:total-received-size stats) 0)
-          get-count (.getAndSet (:get-pulse-count stats) 0)
-          sent-size (.getAndSet (:total-sent-size stats) 0)
-          largest (.getAndSet (:largest-heartbeat-size stats) 0)
-          average (.getAndSet (:average-heartbeat-size stats) 0)
-          total-keys (.size heartbeats)]
-      (log-debug "\nReceived " send-count " heartbeats totaling " 
received-size " bytes,\n"
-                 "Sent " get-count " heartbeats totaling " sent-size " 
bytes,\n"
-                 "The largest heartbeat was " largest " bytes,\n"
-                 "The average heartbeat was " average " bytes,\n"
-                 "Pacemaker contained " total-keys " total keys\n"
-                 "in the last " sleep-seconds " second(s)")
-      (dosync (ref-set last-five-s
-                       {:send-pulse-count send-count
-                        :total-received-size received-size
-                        :get-pulse-count get-count
-                        :total-sent-size sent-size
-                        :largest-heartbeat-size largest
-                        :average-heartbeat-size average
-                        :total-keys total-keys})))
-    (Thread/sleep (* 1000 sleep-seconds))
-    (recur)))
-
-;; JMX stuff
-(defn register [last-five-s]
-  (jmx/register-mbean
-    (jmx/create-bean
-      last-five-s)
-    "org.apache.storm.pacemaker.pacemaker:stats=Stats_Last_5_Seconds"))
-
-
-;; Pacemaker Functions
-
-(defn hb-data []
-  (ConcurrentHashMap.))
-
-(defn create-path [^String path heartbeats]
-  (HBMessage. HBServerMessageType/CREATE_PATH_RESPONSE nil))
-
-(defn exists [^String path heartbeats]
-  (let [it-does (.containsKey heartbeats path)]
-    (log-debug (str "Checking if path [" path "] exists..." it-does "."))
-    (HBMessage. HBServerMessageType/EXISTS_RESPONSE
-                (HBMessageData/boolval it-does))))
-
-(defn send-pulse [^HBPulse pulse heartbeats pacemaker-stats]
-  (let [id (.get_id pulse)
-        details (.get_details pulse)]
-    (log-debug (str "Saving Pulse for id [" id "] data [" + (str details) 
"]."))
-
-    (.incrementAndGet (:send-pulse-count pacemaker-stats))
-    (.addAndGet (:total-received-size pacemaker-stats) (alength details))
-    (set-largest pacemaker-stats (alength details))
-    (set-average pacemaker-stats (alength details))
-
-    (.put heartbeats id details)
-    (HBMessage. HBServerMessageType/SEND_PULSE_RESPONSE nil)))
-
-(defn get-all-pulse-for-path [^String path heartbeats]
-  (HBMessage. HBServerMessageType/GET_ALL_PULSE_FOR_PATH_RESPONSE nil))
-
-(defn get-all-nodes-for-path [^String path ^ConcurrentHashMap heartbeats]
-  (log-debug "List all nodes for path " path)
-  (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE
-              (HBMessageData/nodes
-               (HBNodes. (distinct (for [k (.keySet heartbeats)
-                                         :let [trimmed-k (first
-                                                          (filter #(not (= "" 
%))
-                                                                  (split 
(replace-first k path "") #"/")))]
-                                         :when (and
-                                                (not (nil? trimmed-k))
-                                                (= (.indexOf k path) 0))]
-                                     trimmed-k))))))
-
-(defn get-pulse [^String path heartbeats pacemaker-stats]
-  (let [details (.get heartbeats path)]
-    (log-debug (str "Getting Pulse for path [" path "]...data " (str details) 
"]."))
-
-
-    (.incrementAndGet (:get-pulse-count pacemaker-stats))
-    (if details
-      (.addAndGet (:total-sent-size pacemaker-stats) (alength details)))
-
-    (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE
-                (HBMessageData/pulse
-                 (doto (HBPulse. ) (.set_id path) (.set_details details))))))
-
-(defn delete-pulse-id [^String path heartbeats]
-  (log-debug (str "Deleting Pulse for id [" path "]."))
-  (.remove heartbeats path)
-  (HBMessage. HBServerMessageType/DELETE_PULSE_ID_RESPONSE nil))
-
-(defn delete-path [^String path heartbeats]
-  (let [prefix (if (= \/ (last path)) path (str path "/"))]
-    (doseq [k (.keySet heartbeats)
-            :when (= (.indexOf k prefix) 0)]
-      (delete-pulse-id k heartbeats)))
-  (HBMessage. HBServerMessageType/DELETE_PATH_RESPONSE nil))
-
-(defn not-authorized []
-  (HBMessage. HBServerMessageType/NOT_AUTHORIZED nil))
-
-(defn mk-handler [conf]
-  (let [heartbeats ^ConcurrentHashMap (hb-data)
-        pacemaker-stats {:send-pulse-count (AtomicInteger.)
-                         :total-received-size (AtomicInteger.)
-                         :get-pulse-count (AtomicInteger.)
-                         :total-sent-size (AtomicInteger.)
-                         :largest-heartbeat-size (AtomicInteger.)
-                         :average-heartbeat-size (AtomicInteger.)}
-        last-five (ref {:send-pulse-count 0
-                        :total-received-size 0
-                        :get-pulse-count 0
-                        :total-sent-size 0
-                        :largest-heartbeat-size 0
-                        :average-heartbeat-size 0
-                        :total-keys 0})
-        stats-thread (Thread. (fn [] (report-stats heartbeats pacemaker-stats 
last-five)))]
-    (.setDaemon stats-thread true)
-    (.start stats-thread)
-    (register last-five)
-    (reify
-      IServerMessageHandler
-      (^HBMessage handleMessage [this ^HBMessage request ^boolean 
authenticated]
-        (let [response
-              (condp = (.get_type request)
-                HBServerMessageType/CREATE_PATH
-                (create-path (.get_path (.get_data request)) heartbeats)
-
-                HBServerMessageType/EXISTS
-                (if authenticated
-                  (exists (.get_path (.get_data request)) heartbeats)
-                  (not-authorized))
-
-                HBServerMessageType/SEND_PULSE
-                (send-pulse (.get_pulse (.get_data request)) heartbeats 
pacemaker-stats)
-
-                HBServerMessageType/GET_ALL_PULSE_FOR_PATH
-                (if authenticated
-                  (get-all-pulse-for-path (.get_path (.get_data request)) 
heartbeats)
-                  (not-authorized))
-
-                HBServerMessageType/GET_ALL_NODES_FOR_PATH
-                (if authenticated
-                  (get-all-nodes-for-path (.get_path (.get_data request)) 
heartbeats)
-                  (not-authorized))
-
-                HBServerMessageType/GET_PULSE
-                (if authenticated
-                  (get-pulse (.get_path (.get_data request)) heartbeats 
pacemaker-stats)
-                  (not-authorized))
-
-                HBServerMessageType/DELETE_PATH
-                (delete-path (.get_path (.get_data request)) heartbeats)
-
-                HBServerMessageType/DELETE_PULSE_ID
-                (delete-pulse-id (.get_path (.get_data request)) heartbeats)
-
-                ; Otherwise
-                (log-message "Got Unexpected Type: " (.get_type request)))]
-
-          (.set_message_id response (.get_message_id request))
-          response)))))
-
-(defn launch-server! []
-  (log-message "Starting pacemaker server for storm version '"
-               STORM-VERSION
-               "'")
-  (let [conf (override-login-config-with-system-property (read-storm-config))]
-    (PacemakerServer. (mk-handler conf) conf)))
-
-(defn -main []
-  (redirect-stdio-to-slf4j!)
-  (launch-server!))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/pacemaker/pacemaker_state_factory.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/src/clj/backtype/storm/pacemaker/pacemaker_state_factory.clj 
b/storm-core/src/clj/backtype/storm/pacemaker/pacemaker_state_factory.clj
deleted file mode 100644
index cede59e..0000000
--- a/storm-core/src/clj/backtype/storm/pacemaker/pacemaker_state_factory.clj
+++ /dev/null
@@ -1,125 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.pacemaker.pacemaker-state-factory
-  (:require [org.apache.storm.pacemaker pacemaker]
-            [org.apache.storm.cluster-state [zookeeper-state-factory :as 
zk-factory]]
-            [org.apache.storm
-             [config :refer :all]
-             [cluster :refer :all]
-             [log :refer :all]
-             [util :as util]])
-  (:import [org.apache.storm.generated
-            HBExecutionException HBServerMessageType HBMessage
-            HBMessageData HBPulse]
-           [org.apache.storm.cluster_state zookeeper_state_factory]
-           [org.apache.storm.cluster ClusterState]
-           [org.apache.storm.pacemaker PacemakerClient])
-  (:gen-class
-   :implements [org.apache.storm.cluster.ClusterStateFactory]))
-
-;; So we can mock the client for testing
-(defn makeClient [conf]
-  (PacemakerClient. conf))
-
-(defn makeZKState [conf auth-conf acls context]
-  (.mkState (zookeeper_state_factory.) conf auth-conf acls context))
-
-(def max-retries 10)
-
-(defn -mkState [this conf auth-conf acls context]
-  (let [zk-state (makeZKState conf auth-conf acls context)
-        pacemaker-client (makeClient conf)]
-
-    (reify
-      ClusterState
-      ;; Let these pass through to the zk-state. We only want to handle 
heartbeats.
-      (register [this callback] (.register zk-state callback))
-      (unregister [this callback] (.unregister zk-state callback))
-      (set_ephemeral_node [this path data acls] (.set_ephemeral_node zk-state 
path data acls))
-      (create_sequential [this path data acls] (.create_sequential zk-state 
path data acls))
-      (set_data [this path data acls] (.set_data zk-state path data acls))
-      (delete_node [this path] (.delete_node zk-state path))
-      (delete_node_blobstore [this path nimbus-host-port-info] 
(.delete_node_blobstore zk-state path nimbus-host-port-info))
-      (get_data [this path watch?] (.get_data zk-state path watch?))
-      (get_data_with_version [this path watch?] (.get_data_with_version 
zk-state path watch?))
-      (get_version [this path watch?] (.get_version zk-state path watch?))
-      (get_children [this path watch?] (.get_children zk-state path watch?))
-      (mkdirs [this path acls] (.mkdirs zk-state path acls))
-      (node_exists [this path watch?] (.node_exists zk-state path watch?))
-      (add_listener [this listener] (.add_listener zk-state listener))
-      (sync_path [this path] (.sync_path zk-state path))
-      
-      (set_worker_hb [this path data acls]
-        (util/retry-on-exception
-         max-retries
-         "set_worker_hb"
-         #(let [response
-                (.send pacemaker-client
-                       (HBMessage. HBServerMessageType/SEND_PULSE
-                                   (HBMessageData/pulse
-                                    (doto (HBPulse.)
-                                      (.set_id path)
-                                      (.set_details data)))))]
-            (if (= (.get_type response) 
HBServerMessageType/SEND_PULSE_RESPONSE)
-              :ok
-              (throw (HBExecutionException. "Invalid Response Type"))))))
-
-      (delete_worker_hb [this path]
-        (util/retry-on-exception
-         max-retries
-         "delete_worker_hb"
-         #(let [response
-                (.send pacemaker-client
-                       (HBMessage. HBServerMessageType/DELETE_PATH
-                                   (HBMessageData/path path)))]
-            (if (= (.get_type response) 
HBServerMessageType/DELETE_PATH_RESPONSE)
-              :ok
-              (throw (HBExecutionException. "Invalid Response Type"))))))
-      
-      (get_worker_hb [this path watch?]
-        (util/retry-on-exception
-         max-retries
-         "get_worker_hb"
-         #(let [response
-                (.send pacemaker-client
-                       (HBMessage. HBServerMessageType/GET_PULSE
-                                   (HBMessageData/path path)))]
-            (if (= (.get_type response) HBServerMessageType/GET_PULSE_RESPONSE)
-              (try 
-                (.get_details (.get_pulse (.get_data response)))
-                (catch Exception e
-                  (throw (HBExecutionException. (.toString e)))))
-              (throw (HBExecutionException. "Invalid Response Type"))))))
-      
-      (get_worker_hb_children [this path watch?]
-        (util/retry-on-exception
-         max-retries
-         "get_worker_hb_children"
-         #(let [response
-                (.send pacemaker-client
-                       (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH
-                                   (HBMessageData/path path)))]
-            (if (= (.get_type response) 
HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE)
-              (try
-                (into [] (.get_pulseIds (.get_nodes (.get_data response))))
-                (catch Exception e
-                  (throw (HBExecutionException. (.toString e)))))
-              (throw (HBExecutionException. "Invalid Response Type"))))))
-      
-      (close [this]
-        (.close zk-state)
-        (.close pacemaker-client)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/process_simulator.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/process_simulator.clj 
b/storm-core/src/clj/backtype/storm/process_simulator.clj
deleted file mode 100644
index 03c3dd9..0000000
--- a/storm-core/src/clj/backtype/storm/process_simulator.clj
+++ /dev/null
@@ -1,51 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.process-simulator
-  (:use [org.apache.storm log util]))
-
-(def pid-counter (mk-counter))
-
-(def process-map (atom {}))
-
-(def kill-lock (Object.))
-
-(defn register-process [pid shutdownable]
-  (swap! process-map assoc pid shutdownable))
-
-(defn process-handle
-  [pid]
-  (@process-map pid))
-
-(defn all-processes
-  []
-  (vals @process-map))
-
-(defn kill-process
-  "Uses `locking` in case cluster shuts down while supervisor is
-  killing a task"
-  [pid]
-  (locking kill-lock
-    (log-message "Killing process " pid)
-    (let [shutdownable (process-handle pid)]
-      (swap! process-map dissoc pid)
-      (when shutdownable
-        (.shutdown shutdownable)))))
-
-(defn kill-all-processes
-  []
-  (doseq [pid (keys @process-map)]
-    (kill-process pid)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj 
b/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj
deleted file mode 100644
index f6f89f8..0000000
--- a/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj
+++ /dev/null
@@ -1,77 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.scheduler.DefaultScheduler
-  (:use [org.apache.storm util config])
-  (:require [org.apache.storm.scheduler.EvenScheduler :as EvenScheduler])
-  (:import [org.apache.storm.scheduler IScheduler Topologies
-            Cluster TopologyDetails WorkerSlot SchedulerAssignment
-            EvenScheduler ExecutorDetails])
-  (:gen-class
-    :implements [org.apache.storm.scheduler.IScheduler]))
-
-(defn- bad-slots [existing-slots num-executors num-workers]
-  (if (= 0 num-workers)
-    '()
-    (let [distribution (atom (integer-divided num-executors num-workers))
-          keepers (atom {})]
-      (doseq [[node+port executor-list] existing-slots :let [executor-count 
(count executor-list)]]
-        (when (pos? (get @distribution executor-count 0))
-          (swap! keepers assoc node+port executor-list)
-          (swap! distribution update-in [executor-count] dec)
-          ))
-      (->> @keepers
-           keys
-           (apply dissoc existing-slots)
-           keys
-           (map (fn [[node port]]
-                  (WorkerSlot. node port)))))))
-
-(defn slots-can-reassign [^Cluster cluster slots]
-  (->> slots
-      (filter
-        (fn [[node port]]
-          (if-not (.isBlackListed cluster node)
-            (if-let [supervisor (.getSupervisorById cluster node)]
-              (.contains (.getAllPorts supervisor) (int port))
-              ))))))
-
-(defn -prepare [this conf]
-  )
-
-(defn default-schedule [^Topologies topologies ^Cluster cluster]
-  (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster 
topologies)]
-    (doseq [^TopologyDetails topology needs-scheduling-topologies
-            :let [topology-id (.getId topology)
-                  available-slots (->> (.getAvailableSlots cluster)
-                                       (map #(vector (.getNodeId %) (.getPort 
%))))
-                  all-executors (->> topology
-                                     .getExecutors
-                                     (map #(vector (.getStartTask %) 
(.getEndTask %)))
-                                     set)
-                  alive-assigned 
(EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
-                  alive-executors (->> alive-assigned vals (apply concat) set)
-                  can-reassign-slots (slots-can-reassign cluster (keys 
alive-assigned))
-                  total-slots-to-use (min (.getNumWorkers topology)
-                                          (+ (count can-reassign-slots) (count 
available-slots)))
-                  bad-slots (if (or (> total-slots-to-use (count 
alive-assigned)) 
-                                    (not= alive-executors all-executors))
-                                (bad-slots alive-assigned (count 
all-executors) total-slots-to-use)
-                                [])]]
-      (.freeSlots cluster bad-slots)
-      (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id 
topology}) cluster))))
-
-(defn -schedule [this ^Topologies topologies ^Cluster cluster]
-  (default-schedule topologies cluster))

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj 
b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
deleted file mode 100644
index 783da26..0000000
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ /dev/null
@@ -1,81 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.scheduler.EvenScheduler
-  (:use [org.apache.storm util log config])
-  (:require [clojure.set :as set])
-  (:import [org.apache.storm.scheduler IScheduler Topologies
-            Cluster TopologyDetails WorkerSlot ExecutorDetails])
-  (:gen-class
-    :implements [org.apache.storm.scheduler.IScheduler]))
-
-(defn sort-slots [all-slots]
-  (let [split-up (sort-by count > (vals (group-by first all-slots)))]
-    (apply interleave-all split-up)
-    ))
-
-(defn get-alive-assigned-node+port->executors [cluster topology-id]
-  (let [existing-assignment (.getAssignmentById cluster topology-id)
-        executor->slot (if existing-assignment
-                         (.getExecutorToSlot existing-assignment)
-                         {}) 
-        executor->node+port (into {} (for [[^ExecutorDetails executor 
^WorkerSlot slot] executor->slot
-                                           :let [executor [(.getStartTask 
executor) (.getEndTask executor)]
-                                                 node+port [(.getNodeId slot) 
(.getPort slot)]]]
-                                       {executor node+port}))
-        alive-assigned (reverse-map executor->node+port)]
-    alive-assigned))
-
-(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
-  (let [topology-id (.getId topology)
-        available-slots (->> (.getAvailableSlots cluster)
-                             (map #(vector (.getNodeId %) (.getPort %))))
-        all-executors (->> topology
-                          .getExecutors
-                          (map #(vector (.getStartTask %) (.getEndTask %)))
-                          set)
-        alive-assigned (get-alive-assigned-node+port->executors cluster 
topology-id)
-        total-slots-to-use (min (.getNumWorkers topology)
-                                (+ (count available-slots) (count 
alive-assigned)))
-        reassign-slots (take (- total-slots-to-use (count alive-assigned))
-                             (sort-slots available-slots))
-        reassign-executors (sort (set/difference all-executors (set (apply 
concat (vals alive-assigned)))))
-        reassignment (into {}
-                           (map vector
-                                reassign-executors
-                                ;; for some reason it goes into infinite loop 
without limiting the repeat-seq
-                                (repeat-seq (count reassign-executors) 
reassign-slots)))]
-    (when-not (empty? reassignment)
-      (log-message "Available slots: " (pr-str available-slots))
-      )
-    reassignment))
-
-(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
-  (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster 
topologies)]
-    (doseq [^TopologyDetails topology needs-scheduling-topologies
-            :let [topology-id (.getId topology)
-                  new-assignment (schedule-topology topology cluster)
-                  node+port->executors (reverse-map new-assignment)]]
-      (doseq [[node+port executors] node+port->executors
-              :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last 
node+port))
-                    executors (for [[start-task end-task] executors]
-                                (ExecutorDetails. start-task end-task))]]
-        (.assign cluster slot topology-id executors)))))
-
-(defn -prepare [this conf]
-  )
-
-(defn -schedule [this ^Topologies topologies ^Cluster cluster]
-  (schedule-topologies-evenly topologies cluster))

Reply via email to