Repository: storm Updated Branches: refs/heads/master 8c3fc5590 -> b3bc585a5
Adding separate ZK client for read in Nimbus ZK State Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0d8a2fa5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0d8a2fa5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0d8a2fa5 Branch: refs/heads/master Commit: 0d8a2fa5caa7a78418001fcf67e442cd136c50f0 Parents: f75fdde Author: Kishor Patil <kpa...@yahoo-inc.com> Authored: Thu Oct 22 13:51:53 2015 -0500 Committer: Kishor Patil <kpa...@yahoo-inc.com> Committed: Thu Oct 22 14:34:38 2015 -0500 ---------------------------------------------------------------------- storm-core/src/clj/backtype/storm/cluster.clj | 65 ++++++++++++++-------- 1 file changed, 42 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0d8a2fa5/storm-core/src/clj/backtype/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index e471e53..9057267 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -54,6 +54,10 @@ (when (Utils/isZkAuthenticationConfiguredTopology topo-conf) [(first ZooDefs$Ids/CREATOR_ALL_ACL) (ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))]))) + +(defn is-nimbus? + [] + (= (System/getProperty "daemon.name") "nimbus")) (defnk mk-distributed-cluster-state [conf :auth-conf nil :acls nil] @@ -62,7 +66,7 @@ (.close zk)) (let [callbacks (atom {}) active (atom true) - zk (zk/mk-client conf + zk-writer (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf @@ -70,10 +74,24 @@ :watcher (fn [state type path] (when @active (when-not (= :connected state) - (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper.")) + (log-warn "Received event " state ":" type ":" path " with disconnected Writer Zookeeper.")) (when-not (= :none type) (doseq [callback (vals @callbacks)] - (callback type path))))))] + (callback type path)))))) + zk-reader (if (is-nimbus?) + (zk/mk-client conf + (conf STORM-ZOOKEEPER-SERVERS) + (conf STORM-ZOOKEEPER-PORT) + :auth-conf auth-conf + :root (conf STORM-ZOOKEEPER-ROOT) + :watcher (fn [state type path] + (when @active + (when-not (= :connected state) + (log-warn "Received event " state ":" type ":" path " with disconnected Reader Zookeeper.")) + (when-not (= :none type) + (doseq [callback (vals @callbacks)] + (callback type path)))))) + zk-writer)] (reify ClusterState @@ -89,68 +107,69 @@ (set-ephemeral-node [this path data acls] - (zk/mkdirs zk (parent-path path) acls) - (if (zk/exists zk path false) + (zk/mkdirs zk-writer (parent-path path) acls) + (if (zk/exists zk-writer path false) (try-cause - (zk/set-data zk path data) ; should verify that it's ephemeral + (zk/set-data zk-writer path data) ; should verify that it's ephemeral (catch KeeperException$NoNodeException e (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data") - (zk/create-node zk path data :ephemeral acls))) - (zk/create-node zk path data :ephemeral acls))) + (zk/create-node zk-writer path data :ephemeral acls))) + (zk/create-node zk-writer path data :ephemeral acls))) (create-sequential [this path data acls] - (zk/create-node zk path data :sequential acls)) + (zk/create-node zk-writer path data :sequential acls)) (set-data [this path data acls] ;; note: this does not turn off any existing watches - (if (zk/exists zk path false) - (zk/set-data zk path data) + (if (zk/exists zk-writer path false) + (zk/set-data zk-writer path data) (do - (zk/mkdirs zk (parent-path path) acls) - (zk/create-node zk path data :persistent acls)))) + (zk/mkdirs zk-writer (parent-path path) acls) + (zk/create-node zk-writer path data :persistent acls)))) (delete-node [this path] - (zk/delete-node zk path)) + (zk/delete-node zk-writer path)) (get-data [this path watch?] - (zk/get-data zk path watch?)) + (zk/get-data zk-reader path watch?)) (get-data-with-version [this path watch?] - (zk/get-data-with-version zk path watch?)) + (zk/get-data-with-version zk-reader path watch?)) (get-version [this path watch?] - (zk/get-version zk path watch?)) + (zk/get-version zk-reader path watch?)) (get-children [this path watch?] - (zk/get-children zk path watch?)) + (zk/get-children zk-reader path watch?)) (mkdirs [this path acls] - (zk/mkdirs zk path acls)) + (zk/mkdirs zk-writer path acls)) (exists-node? [this path watch?] - (zk/exists-node? zk path watch?)) + (zk/exists-node? zk-reader path watch?)) (close [this] (reset! active false) - (.close zk)) + (.close zk-writer) + (if (is-nimbus?) (.close zk-reader))) (add-listener [this listener] - (zk/add-listener zk listener)) + (zk/add-listener zk-reader listener)) (sync-path [this path] - (zk/sync-path zk path)) + (zk/sync-path zk-writer path)) ))) (defprotocol StormClusterState