Repository: storm
Updated Branches:
  refs/heads/master 8c3fc5590 -> b3bc585a5


Adding separate ZK client for read in Nimbus ZK State


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0d8a2fa5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0d8a2fa5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0d8a2fa5

Branch: refs/heads/master
Commit: 0d8a2fa5caa7a78418001fcf67e442cd136c50f0
Parents: f75fdde
Author: Kishor Patil <kpa...@yahoo-inc.com>
Authored: Thu Oct 22 13:51:53 2015 -0500
Committer: Kishor Patil <kpa...@yahoo-inc.com>
Committed: Thu Oct 22 14:34:38 2015 -0500

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/cluster.clj | 65 ++++++++++++++--------
 1 file changed, 42 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0d8a2fa5/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj 
b/storm-core/src/clj/backtype/storm/cluster.clj
index e471e53..9057267 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -54,6 +54,10 @@
     (when (Utils/isZkAuthenticationConfiguredTopology topo-conf)
       [(first ZooDefs$Ids/CREATOR_ALL_ACL)
        (ACL. ZooDefs$Perms/READ (Id. "digest" 
(DigestAuthenticationProvider/generateDigest payload)))])))
+ 
+(defn is-nimbus?
+  []
+  (= (System/getProperty "daemon.name") "nimbus"))
 
 (defnk mk-distributed-cluster-state
   [conf :auth-conf nil :acls nil]
@@ -62,7 +66,7 @@
     (.close zk))
   (let [callbacks (atom {})
         active (atom true)
-        zk (zk/mk-client conf
+        zk-writer (zk/mk-client conf
                          (conf STORM-ZOOKEEPER-SERVERS)
                          (conf STORM-ZOOKEEPER-PORT)
                          :auth-conf auth-conf
@@ -70,10 +74,24 @@
                          :watcher (fn [state type path]
                                     (when @active
                                       (when-not (= :connected state)
-                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Zookeeper."))
+                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Writer Zookeeper."))
                                       (when-not (= :none type)
                                         (doseq [callback (vals @callbacks)]
-                                          (callback type path))))))]
+                                          (callback type path))))))
+        zk-reader (if (is-nimbus?)
+                    (zk/mk-client conf
+                         (conf STORM-ZOOKEEPER-SERVERS)
+                         (conf STORM-ZOOKEEPER-PORT)
+                         :auth-conf auth-conf
+                         :root (conf STORM-ZOOKEEPER-ROOT)
+                         :watcher (fn [state type path]
+                                    (when @active
+                                      (when-not (= :connected state)
+                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Reader Zookeeper."))
+                                      (when-not (= :none type)
+                                        (doseq [callback (vals @callbacks)]
+                                          (callback type path))))))
+                    zk-writer)]
     (reify
      ClusterState
 
@@ -89,68 +107,69 @@
 
      (set-ephemeral-node
        [this path data acls]
-       (zk/mkdirs zk (parent-path path) acls)
-       (if (zk/exists zk path false)
+       (zk/mkdirs zk-writer (parent-path path) acls)
+       (if (zk/exists zk-writer path false)
          (try-cause
-           (zk/set-data zk path data) ; should verify that it's ephemeral
+           (zk/set-data zk-writer path data) ; should verify that it's 
ephemeral
            (catch KeeperException$NoNodeException e
              (log-warn-error e "Ephemeral node disappeared between checking 
for existing and setting data")
-             (zk/create-node zk path data :ephemeral acls)))
-         (zk/create-node zk path data :ephemeral acls)))
+             (zk/create-node zk-writer path data :ephemeral acls)))
+         (zk/create-node zk-writer path data :ephemeral acls)))
 
      (create-sequential
        [this path data acls]
-       (zk/create-node zk path data :sequential acls))
+       (zk/create-node zk-writer path data :sequential acls))
 
      (set-data
        [this path data acls]
        ;; note: this does not turn off any existing watches
-       (if (zk/exists zk path false)
-         (zk/set-data zk path data)
+       (if (zk/exists zk-writer path false)
+         (zk/set-data zk-writer path data)
          (do
-           (zk/mkdirs zk (parent-path path) acls)
-           (zk/create-node zk path data :persistent acls))))
+           (zk/mkdirs zk-writer (parent-path path) acls)
+           (zk/create-node zk-writer path data :persistent acls))))
 
      (delete-node
        [this path]
-       (zk/delete-node zk path))
+       (zk/delete-node zk-writer path))
 
      (get-data
        [this path watch?]
-       (zk/get-data zk path watch?))
+       (zk/get-data zk-reader path watch?))
 
      (get-data-with-version
        [this path watch?]
-       (zk/get-data-with-version zk path watch?))
+       (zk/get-data-with-version zk-reader path watch?))
 
      (get-version 
        [this path watch?]
-       (zk/get-version zk path watch?))
+       (zk/get-version zk-reader path watch?))
 
      (get-children
        [this path watch?]
-       (zk/get-children zk path watch?))
+       (zk/get-children zk-reader path watch?))
 
      (mkdirs
        [this path acls]
-       (zk/mkdirs zk path acls))
+       (zk/mkdirs zk-writer path acls))
 
      (exists-node?
        [this path watch?]
-       (zk/exists-node? zk path watch?))
+       (zk/exists-node? zk-reader path watch?))
 
      (close
        [this]
        (reset! active false)
-       (.close zk))
+       (.close zk-writer)
+       (if (is-nimbus?) (.close zk-reader)))
 
       (add-listener
         [this listener]
-        (zk/add-listener zk listener))
+        (zk/add-listener zk-reader listener))
 
       (sync-path
         [this path]
-        (zk/sync-path zk path))
+        (zk/sync-path zk-writer path))
       )))
 
 (defprotocol StormClusterState

Reply via email to