Lowering test time with rebalance/sleep .

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f9c41e31
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f9c41e31
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f9c41e31

Branch: refs/heads/master
Commit: f9c41e31cdd7a04f982bdfb1f25d330fa1340d47
Parents: 1593a37
Author: Kishor Patil <kpa...@yahoo-inc.com>
Authored: Wed Oct 21 22:32:53 2015 +0000
Committer: Kishor Patil <kpa...@yahoo-inc.com>
Committed: Wed Oct 21 23:04:22 2015 +0000

----------------------------------------------------------------------
 .../clj/backtype/storm/integration_test.clj     | 20 +++++++++++-----
 .../test/clj/backtype/storm/nimbus_test.clj     | 24 +++++++++++++++-----
 .../test/clj/backtype/storm/supervisor_test.clj |  4 ++++
 .../test/clj/backtype/storm/testing4j_test.clj  |  5 ++++
 .../clj/backtype/storm/transactional_test.clj   | 12 ++++++++--
 5 files changed, 51 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/integration_test.clj 
b/storm-core/test/clj/backtype/storm/integration_test.clj
index 3380536..4a8b946 100644
--- a/storm-core/test/clj/backtype/storm/integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/integration_test.clj
@@ -17,7 +17,7 @@
   (:use [clojure test])
   (:import [backtype.storm Config])
   (:import [backtype.storm.topology TopologyBuilder])
-  (:import [backtype.storm.generated InvalidTopologyException SubmitOptions 
TopologyInitialStatus])
+  (:import [backtype.storm.generated InvalidTopologyException SubmitOptions 
TopologyInitialStatus RebalanceOptions])
   (:import [backtype.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount
             TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker 
TestPlannerSpout])
   (:import [backtype.storm.tuple Fields])
@@ -237,7 +237,9 @@
                              "acking-test1"
                              {}
                              (:topology tracked))
-      (Thread/sleep 11000)
+      ;; Instead of sleeping until topology is scheduled, rebalance topology 
so mk-assignments is called.
+      (.rebalance (:nimbus cluster)
+                             "acking-test1" (doto (RebalanceOptions.) 
(.set_wait_secs 0)))
       (.feed feeder1 [1])
       (tracked-wait tracked 1)
       (checker1 0)
@@ -280,7 +282,9 @@
                              "test-acking2"
                              {}
                              (:topology tracked))
-      (Thread/sleep 11000)
+      ;; Instead of sleeping until topology is scheduled, rebalance topology 
so mk-assignments is called.
+      (.rebalance (:nimbus cluster)
+                             "test-acking2" (doto (RebalanceOptions.) 
(.set_wait_secs 0)))
       (.feed feeder [1])
       (tracked-wait tracked 1)
       (checker 0)
@@ -326,7 +330,7 @@
         {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
         topology
         (SubmitOptions. TopologyInitialStatus/INACTIVE))
-      (Thread/sleep 11000)
+      (advance-cluster-time cluster 11)
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 9)
       (is (not @bolt-prepared?))
@@ -351,7 +355,9 @@
                              "test"
                              {}
                              (:topology tracked))
-      (Thread/sleep 11000)
+      ;; Instead of sleeping until topology is scheduled, rebalance topology 
so mk-assignments is called.
+      (.rebalance (:nimbus cluster)
+                             "test" (doto (RebalanceOptions.) (.set_wait_secs 
0)))
       (.feed feeder [1])
       (tracked-wait tracked 1)
       (checker 1)
@@ -573,7 +579,9 @@
                                               TOPOLOGY-DEBUG true
                                               }
                                              (:topology tracked))
-            _ (advance-cluster-time cluster 11)
+            ;; Instead of sleeping until topology is scheduled, rebalance 
topology so mk-assignments is called.
+            _ (.rebalance (:nimbus cluster)
+                                             "test-errors" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
             storm-id (get-storm-id state "test-errors")
             errors-count (fn [] (count (.errors state storm-id "2")))]
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj 
b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index f84bc9f..0b36e51 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -206,7 +206,9 @@
                       "4" (thrift/mk-bolt-spec {"1" :global "2" :none} 
(TestPlannerBolt.) :parallelism-hint 4)}
                      )
           _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} 
topology)
-          _ (Thread/sleep 11000)
+          ;; Instead of sleeping until topology is scheduled, rebalance 
topology so mk-assignments is called.
+          _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) 
(.set_wait_secs 0)))
+          _ (Thread/sleep 1000)
           task-info (storm-component->task-info cluster "mystorm")]
       (check-consistency cluster "mystorm")
       ;; 3 should be assigned once (if it were optimized, we'd have
@@ -217,7 +219,9 @@
       (is (= 1 (count (task-info "3"))))
       (is (= 4 (storm-num-workers state "mystorm")))
       (submit-local-topology nimbus "storm2" {TOPOLOGY-WORKERS 20} topology2)
-      (Thread/sleep 11000)
+      ;; Instead of sleeping until topology is scheduled, rebalance topology 
so mk-assignments is called.
+      (.rebalance nimbus "storm2" (doto (RebalanceOptions.) (.set_wait_secs 
0)))
+      (Thread/sleep 1000)
       (check-consistency cluster "storm2")
       (is (= 2 (count (.assignments state nil))))
       (let [task-info (storm-component->task-info cluster "storm2")]
@@ -342,7 +346,9 @@
                     {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 1 :conf {TOPOLOGY-TASKS 2})
                      "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) 
:conf {TOPOLOGY-TASKS 5})})
           _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} 
topology)
-          _ (Thread/sleep 11000)
+          ;; Instead of sleeping until topology is scheduled, rebalance 
topology so mk-assignments is called.
+          _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) 
(.set_wait_secs 0)))
+          _ (Thread/sleep 1000)
           task-info (storm-component->task-info cluster "mystorm")]
       (check-consistency cluster "mystorm")
       (is (= 0 (count (task-info "1"))))
@@ -359,7 +365,9 @@
                     {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 8 :conf {TOPOLOGY-TASKS 2})
                      "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) 
:parallelism-hint 3)})
           _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} 
topology)
-          _ (Thread/sleep 11000)
+          ;; Instead of sleeping until topology is scheduled, rebalance 
topology so mk-assignments is called.
+          _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) 
(.set_wait_secs 0)))
+          _ (Thread/sleep 1000)
           task-info (storm-component->task-info cluster "mystorm")
           executor-info (->> (storm-component->executor-info cluster "mystorm")
                              (map-val #(map executor-id->tasks %)))]
@@ -386,7 +394,9 @@
                       "4" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) 
:parallelism-hint 10)}
                      )
           _ (submit-local-topology nimbus "test" {TOPOLOGY-WORKERS 7} topology)
-          _ (Thread/sleep 11000)
+          ;; Instead of sleeping until topology is scheduled, rebalance 
topology so mk-assignments is called.
+          _ (.rebalance nimbus "test" (doto (RebalanceOptions.) 
(.set_wait_secs 0)))
+          _ (Thread/sleep 1000)
           task-info (storm-component->task-info cluster "test")]
       (check-consistency cluster "test")
       (is (= 21 (count (task-info "1"))))
@@ -1038,7 +1048,9 @@
 
               ;first we verify that the master nimbus can perform all actions, 
even with another nimbus present.
               (submit-local-topology nimbus "t1" {} topology)
-              (Thread/sleep 11000)
+              ;; Instead of sleeping until topology is scheduled, rebalance 
topology so mk-assignments is called.
+              (.rebalance nimbus "t1" (doto (RebalanceOptions.) 
(.set_wait_secs 0)))
+              (Thread/sleep 1000)
               (.deactivate nimbus "t1")
               (.activate nimbus "t1")
               (.rebalance nimbus "t1" (RebalanceOptions.))

http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj 
b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index 885517f..57d4cc2 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -103,6 +103,7 @@
                          ["sup1" 2] [0.0 0.0 0.0]
                          ["sup1" 3] [0.0 0.0 0.0]
                          })
+                      ;; Instead of sleeping until topology is scheduled, 
rebalance topology so mk-assignments is called.
                       (.rebalance (:nimbus cluster) "test" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
                       (advance-cluster-time cluster 2)
                       (heartbeat-workers cluster "sup1" [1 2 3])
@@ -159,6 +160,7 @@
                          ["sup1" 2] [0.0 0.0 0.0]
                          ["sup2" 1] [0.0 0.0 0.0]
                          })
+                      ;; Instead of sleeping until topology is scheduled, 
rebalance topology so mk-assignments is called.
                       (.rebalance (:nimbus cluster) "test" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
                       (advance-cluster-time cluster 2)
                       (heartbeat-workers cluster "sup1" [1 2])
@@ -183,6 +185,7 @@
                         {["sup1" 3] [0.0 0.0 0.0]
                          ["sup2" 2] [0.0 0.0 0.0]
                          })
+                      ;; Instead of sleeping until topology is scheduled, 
rebalance topology so mk-assignments is called.
                       (.rebalance (:nimbus cluster) "test2" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
                       (advance-cluster-time cluster 2)
                       (heartbeat-workers cluster "sup1" [3])
@@ -696,6 +699,7 @@
                      {["sup1" 1] [0.0 0.0 0.0]
                       ["sup1" 2] [0.0 0.0 0.0]
                       })
+                    ;; Instead of sleeping until topology is scheduled, 
rebalance topology so mk-assignments is called.
                     (.rebalance (:nimbus cluster) "topology1" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
                     ))
      (is (empty? (:launched changed)))

http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/testing4j_test.clj 
b/storm-core/test/clj/backtype/storm/testing4j_test.clj
index 36f3df6..f2da026 100644
--- a/storm-core/test/clj/backtype/storm/testing4j_test.clj
+++ b/storm-core/test/clj/backtype/storm/testing4j_test.clj
@@ -20,6 +20,7 @@
   (:require [backtype.storm.thrift :as thrift])
   (:import [backtype.storm Testing Config ILocalCluster])
   (:import [backtype.storm.tuple Values Tuple])
+  (:import [backtype.storm.generated RebalanceOptions])
   (:import [backtype.storm.utils Time Utils])
   (:import [backtype.storm.testing MkClusterParam TestJob MockedSources 
TestWordSpout
             TestWordCounter TestGlobalCount TestAggregatesCounter 
CompleteTopologyParam
@@ -118,6 +119,10 @@
                           "test-acking2"
                           (Config.)
                           (.getTopology tracked))
+         ;; Instead of sleeping until topology is scheduled, rebalance 
topology so mk-assignments is called.
+         (.rebalance cluster
+                          "test-acking2" (doto (RebalanceOptions.) 
(.set_wait_secs 0)))
+         (Thread/sleep 1000)
          (.feed feeder [1])
          (Testing/trackedWait tracked (int 1))
          (checker 0)

http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/transactional_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/transactional_test.clj 
b/storm-core/test/clj/backtype/storm/transactional_test.clj
index a1164cd..0acaf2f 100644
--- a/storm-core/test/clj/backtype/storm/transactional_test.clj
+++ b/storm-core/test/clj/backtype/storm/transactional_test.clj
@@ -20,6 +20,7 @@
   (:import [backtype.storm.transactional TransactionalSpoutCoordinator 
ITransactionalSpout ITransactionalSpout$Coordinator TransactionAttempt
             TransactionalTopologyBuilder])
   (:import [backtype.storm.transactional.state TransactionalState 
TestTransactionalState RotatingTransactionalState 
RotatingTransactionalState$StateInitializer])
+  (:import [backtype.storm.generated RebalanceOptions])
   (:import [backtype.storm.spout SpoutOutputCollector ISpoutOutputCollector])
   (:import [backtype.storm.task OutputCollector IOutputCollector])
   (:import [backtype.storm.coordination BatchBoltExecutor])
@@ -411,7 +412,11 @@
                               "transactional-test"
                               {TOPOLOGY-MAX-SPOUT-PENDING 2}
                               (:topology topo-info))
-       (Thread/sleep 11000)
+       ;; Instead of sleeping until topology is scheduled, rebalance topology 
so mk-assignments is called.
+       (.rebalance (:nimbus cluster)
+                   "test-acking2" 
+                   (doto (RebalanceOptions.) (.set_wait_secs 0)))
+
 
        (bind ack-tx! (fn [txid]
                        (let [[to-ack not-to-ack] (separate
@@ -667,7 +672,10 @@
                               {TOPOLOGY-MAX-SPOUT-PENDING 2
                                }
                               (:topology topo-info))
-       (Thread/sleep 11000)
+       ;; Instead of sleeping until topology is scheduled, rebalance topology 
so mk-assignments is called.
+       (.rebalance (:nimbus cluster)
+                   "test-acking2" 
+                   (doto (RebalanceOptions.) (.set_wait_secs 0)))
 
        (bind ack-tx! (fn [txid]
                        (let [[to-ack not-to-ack] (separate

Reply via email to