Lowering test time with rebalance/sleep .
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f9c41e31 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f9c41e31 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f9c41e31 Branch: refs/heads/master Commit: f9c41e31cdd7a04f982bdfb1f25d330fa1340d47 Parents: 1593a37 Author: Kishor Patil <kpa...@yahoo-inc.com> Authored: Wed Oct 21 22:32:53 2015 +0000 Committer: Kishor Patil <kpa...@yahoo-inc.com> Committed: Wed Oct 21 23:04:22 2015 +0000 ---------------------------------------------------------------------- .../clj/backtype/storm/integration_test.clj | 20 +++++++++++----- .../test/clj/backtype/storm/nimbus_test.clj | 24 +++++++++++++++----- .../test/clj/backtype/storm/supervisor_test.clj | 4 ++++ .../test/clj/backtype/storm/testing4j_test.clj | 5 ++++ .../clj/backtype/storm/transactional_test.clj | 12 ++++++++-- 5 files changed, 51 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/integration_test.clj b/storm-core/test/clj/backtype/storm/integration_test.clj index 3380536..4a8b946 100644 --- a/storm-core/test/clj/backtype/storm/integration_test.clj +++ b/storm-core/test/clj/backtype/storm/integration_test.clj @@ -17,7 +17,7 @@ (:use [clojure test]) (:import [backtype.storm Config]) (:import [backtype.storm.topology TopologyBuilder]) - (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus]) + (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions]) (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout]) (:import [backtype.storm.tuple Fields]) @@ -237,7 +237,9 @@ "acking-test1" {} (:topology tracked)) - (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance (:nimbus cluster) + "acking-test1" (doto (RebalanceOptions.) (.set_wait_secs 0))) (.feed feeder1 [1]) (tracked-wait tracked 1) (checker1 0) @@ -280,7 +282,9 @@ "test-acking2" {} (:topology tracked)) - (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance (:nimbus cluster) + "test-acking2" (doto (RebalanceOptions.) (.set_wait_secs 0))) (.feed feeder [1]) (tracked-wait tracked 1) (checker 0) @@ -326,7 +330,7 @@ {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology (SubmitOptions. TopologyInitialStatus/INACTIVE)) - (Thread/sleep 11000) + (advance-cluster-time cluster 11) (.feed feeder ["a"] 1) (advance-cluster-time cluster 9) (is (not @bolt-prepared?)) @@ -351,7 +355,9 @@ "test" {} (:topology tracked)) - (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance (:nimbus cluster) + "test" (doto (RebalanceOptions.) (.set_wait_secs 0))) (.feed feeder [1]) (tracked-wait tracked 1) (checker 1) @@ -573,7 +579,9 @@ TOPOLOGY-DEBUG true } (:topology tracked)) - _ (advance-cluster-time cluster 11) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + _ (.rebalance (:nimbus cluster) + "test-errors" (doto (RebalanceOptions.) (.set_wait_secs 0))) storm-id (get-storm-id state "test-errors") errors-count (fn [] (count (.errors state storm-id "2")))] http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj index f84bc9f..0b36e51 100644 --- a/storm-core/test/clj/backtype/storm/nimbus_test.clj +++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj @@ -206,7 +206,9 @@ "4" (thrift/mk-bolt-spec {"1" :global "2" :none} (TestPlannerBolt.) :parallelism-hint 4)} ) _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology) - _ (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) (.set_wait_secs 0))) + _ (Thread/sleep 1000) task-info (storm-component->task-info cluster "mystorm")] (check-consistency cluster "mystorm") ;; 3 should be assigned once (if it were optimized, we'd have @@ -217,7 +219,9 @@ (is (= 1 (count (task-info "3")))) (is (= 4 (storm-num-workers state "mystorm"))) (submit-local-topology nimbus "storm2" {TOPOLOGY-WORKERS 20} topology2) - (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance nimbus "storm2" (doto (RebalanceOptions.) (.set_wait_secs 0))) + (Thread/sleep 1000) (check-consistency cluster "storm2") (is (= 2 (count (.assignments state nil)))) (let [task-info (storm-component->task-info cluster "storm2")] @@ -342,7 +346,9 @@ {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 1 :conf {TOPOLOGY-TASKS 2}) "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) :conf {TOPOLOGY-TASKS 5})}) _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology) - _ (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) (.set_wait_secs 0))) + _ (Thread/sleep 1000) task-info (storm-component->task-info cluster "mystorm")] (check-consistency cluster "mystorm") (is (= 0 (count (task-info "1")))) @@ -359,7 +365,9 @@ {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 8 :conf {TOPOLOGY-TASKS 2}) "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) :parallelism-hint 3)}) _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology) - _ (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) (.set_wait_secs 0))) + _ (Thread/sleep 1000) task-info (storm-component->task-info cluster "mystorm") executor-info (->> (storm-component->executor-info cluster "mystorm") (map-val #(map executor-id->tasks %)))] @@ -386,7 +394,9 @@ "4" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 10)} ) _ (submit-local-topology nimbus "test" {TOPOLOGY-WORKERS 7} topology) - _ (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + _ (.rebalance nimbus "test" (doto (RebalanceOptions.) (.set_wait_secs 0))) + _ (Thread/sleep 1000) task-info (storm-component->task-info cluster "test")] (check-consistency cluster "test") (is (= 21 (count (task-info "1")))) @@ -1038,7 +1048,9 @@ ;first we verify that the master nimbus can perform all actions, even with another nimbus present. (submit-local-topology nimbus "t1" {} topology) - (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0))) + (Thread/sleep 1000) (.deactivate nimbus "t1") (.activate nimbus "t1") (.rebalance nimbus "t1" (RebalanceOptions.)) http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj index 885517f..57d4cc2 100644 --- a/storm-core/test/clj/backtype/storm/supervisor_test.clj +++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj @@ -103,6 +103,7 @@ ["sup1" 2] [0.0 0.0 0.0] ["sup1" 3] [0.0 0.0 0.0] }) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) (.set_wait_secs 0))) (advance-cluster-time cluster 2) (heartbeat-workers cluster "sup1" [1 2 3]) @@ -159,6 +160,7 @@ ["sup1" 2] [0.0 0.0 0.0] ["sup2" 1] [0.0 0.0 0.0] }) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) (.set_wait_secs 0))) (advance-cluster-time cluster 2) (heartbeat-workers cluster "sup1" [1 2]) @@ -183,6 +185,7 @@ {["sup1" 3] [0.0 0.0 0.0] ["sup2" 2] [0.0 0.0 0.0] }) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. (.rebalance (:nimbus cluster) "test2" (doto (RebalanceOptions.) (.set_wait_secs 0))) (advance-cluster-time cluster 2) (heartbeat-workers cluster "sup1" [3]) @@ -696,6 +699,7 @@ {["sup1" 1] [0.0 0.0 0.0] ["sup1" 2] [0.0 0.0 0.0] }) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0))) )) (is (empty? (:launched changed))) http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/testing4j_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/testing4j_test.clj b/storm-core/test/clj/backtype/storm/testing4j_test.clj index 36f3df6..f2da026 100644 --- a/storm-core/test/clj/backtype/storm/testing4j_test.clj +++ b/storm-core/test/clj/backtype/storm/testing4j_test.clj @@ -20,6 +20,7 @@ (:require [backtype.storm.thrift :as thrift]) (:import [backtype.storm Testing Config ILocalCluster]) (:import [backtype.storm.tuple Values Tuple]) + (:import [backtype.storm.generated RebalanceOptions]) (:import [backtype.storm.utils Time Utils]) (:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam @@ -118,6 +119,10 @@ "test-acking2" (Config.) (.getTopology tracked)) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance cluster + "test-acking2" (doto (RebalanceOptions.) (.set_wait_secs 0))) + (Thread/sleep 1000) (.feed feeder [1]) (Testing/trackedWait tracked (int 1)) (checker 0) http://git-wip-us.apache.org/repos/asf/storm/blob/f9c41e31/storm-core/test/clj/backtype/storm/transactional_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/transactional_test.clj b/storm-core/test/clj/backtype/storm/transactional_test.clj index a1164cd..0acaf2f 100644 --- a/storm-core/test/clj/backtype/storm/transactional_test.clj +++ b/storm-core/test/clj/backtype/storm/transactional_test.clj @@ -20,6 +20,7 @@ (:import [backtype.storm.transactional TransactionalSpoutCoordinator ITransactionalSpout ITransactionalSpout$Coordinator TransactionAttempt TransactionalTopologyBuilder]) (:import [backtype.storm.transactional.state TransactionalState TestTransactionalState RotatingTransactionalState RotatingTransactionalState$StateInitializer]) + (:import [backtype.storm.generated RebalanceOptions]) (:import [backtype.storm.spout SpoutOutputCollector ISpoutOutputCollector]) (:import [backtype.storm.task OutputCollector IOutputCollector]) (:import [backtype.storm.coordination BatchBoltExecutor]) @@ -411,7 +412,11 @@ "transactional-test" {TOPOLOGY-MAX-SPOUT-PENDING 2} (:topology topo-info)) - (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance (:nimbus cluster) + "test-acking2" + (doto (RebalanceOptions.) (.set_wait_secs 0))) + (bind ack-tx! (fn [txid] (let [[to-ack not-to-ack] (separate @@ -667,7 +672,10 @@ {TOPOLOGY-MAX-SPOUT-PENDING 2 } (:topology topo-info)) - (Thread/sleep 11000) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance (:nimbus cluster) + "test-acking2" + (doto (RebalanceOptions.) (.set_wait_secs 0))) (bind ack-tx! (fn [txid] (let [[to-ack not-to-ack] (separate