Repository: storm
Updated Branches:
  refs/heads/master 4721c3617 -> dc4011cd6


STORM-2958 - Use new wait strategy model for Spout as well


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51997889
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51997889
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51997889

Branch: refs/heads/master
Commit: 51997889492ab9ebe68f495a232c239c33b31d9f
Parents: 4721c36
Author: Roshan Naik <ros...@hw13642.hsd1.ca.comcast.net>
Authored: Mon Feb 19 16:18:13 2018 -0800
Committer: Roshan Naik <ros...@hw13642.hsd1.ca.comcast.net>
Committed: Mon Feb 19 16:18:31 2018 -0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |  8 ++-
 docs/Performance.md                             | 27 ++++-----
 .../src/jvm/org/apache/storm/Config.java        | 58 +++++++++++++++-----
 .../storm/executor/bolt/BoltExecutor.java       |  1 -
 .../storm/executor/spout/SpoutExecutor.java     | 35 ++++++------
 .../org/apache/storm/policy/IWaitStrategy.java  |  2 +-
 .../apache/storm/policy/WaitStrategyPark.java   |  4 +-
 .../storm/policy/WaitStrategyProgressive.java   |  8 ++-
 .../apache/storm/spout/ISpoutWaitStrategy.java  | 34 ------------
 .../storm/spout/NothingEmptyEmitStrategy.java   | 31 -----------
 .../storm/spout/SleepSpoutWaitStrategy.java     | 41 --------------
 11 files changed, 92 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 6269282..d64623e 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -242,8 +242,12 @@ topology.tick.tuple.freq.secs: null
 topology.worker.shared.thread.pool.size: 4
 
 # Spout Wait Strategy - employed when there is no data to produce
-topology.spout.wait.strategy: "org.apache.storm.spout.SleepSpoutWaitStrategy"
-topology.sleep.spout.wait.strategy.time.ms: 1
+topology.spout.wait.strategy: "org.apache.storm.policy.WaitStrategyProgressive"
+topology.spout.wait.park.microsec : 100          # park time for 
org.apache.storm.policy.WaitStrategyPark. Busy spins if set to 0.
+
+topology.spout.wait.progressive.level1.count: 0          # number of 
iterations to spend in level 1 [no sleep] of WaitStrategyProgressive, before 
progressing to level 2
+topology.spout.wait.progressive.level2.count: 0          # number of 
iterations to spend in level 2 [parkNanos(1)] of WaitStrategyProgressive, 
before progressing to level 3
+topology.spout.wait.progressive.level3.sleep.millis: 1   # sleep duration for 
idling iterations in level 3 of WaitStrategyProgressive
 
 # Bolt Wait Strategy - employed when there is no data in its receive buffer to 
process
 topology.bolt.wait.strategy : "org.apache.storm.policy.WaitStrategyProgressive"

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/docs/Performance.md
----------------------------------------------------------------------
diff --git a/docs/Performance.md b/docs/Performance.md
index 6d7d33e..20acb32 100644
--- a/docs/Performance.md
+++ b/docs/Performance.md
@@ -90,8 +90,10 @@ values.
 ## 4. Wait Strategy
 Wait strategies are used to conserve CPU usage by trading off some latency and 
throughput. They are applied for the following situations:
 
-4.1 **Spout Wait:**  In low/no traffic situations, Spout's nextTuple() may not 
produce any new emits. To prevent invoking the Spout's nextTuple,
-this wait strategy is used between nextTuple() calls to allow the spout's 
executor thread to idle and conserve CPU. Select a strategy using 
`topology.spout.wait.strategy`.
+4.1 **Spout Wait:**  In low/no traffic situations, Spout's nextTuple() may not 
produce any new emits. To prevent invoking the Spout's nextTuple too often,
+this wait strategy is used between nextTuple() calls, allowing the spout's 
executor thread to idle and conserve CPU. Spout wait strategy is also used
+when the `topology.max.spout.pending` limit has been reached when ACKers are 
enabled. Select a strategy using `topology.spout.wait.strategy`. Configure the
+chosen wait strategy using one of the `topology.spout.wait.*` settings.
 
 4.2 **Bolt Wait:** : When a bolt polls it's receive queue for new messages to 
process, it is possible that the queue is empty. This typically happens
 in case of low/no traffic situations or when the upstream spout/bolt is 
inherently slower. This wait strategy is used in such cases. It avoids high CPU 
usage
@@ -104,24 +106,23 @@ conserving CPU. The chosen strategy can be further 
configured using the `topolog
 
 
 #### Built-in wait strategies:
-
-- **SleepSpoutWaitStrategy** : This is the only built-in strategy available 
for Spout Wait. It cannot be applied to other Wait situations. It is a simple 
static strategy that
-calls Thread.sleep() each time. Set `topology.spout.wait.strategy` to 
`org.apache.storm.spout.SleepSpoutWaitStrategy` for using this. 
`topology.sleep.spout.wait.strategy.time.ms`
-configures the sleep time.
+These wait strategies are availabe for use with all of the above mentioned 
wait situations.
 
 - **ProgressiveWaitStrategy** : This strategy can be used for Bolt Wait or 
Backpressure Wait situations. Set the strategy to 
'org.apache.storm.policy.WaitStrategyProgressive' to
 select this wait strategy. This is a dynamic wait strategy that enters into 
progressively deeper states of CPU conservation if the Backpressure Wait or 
Bolt Wait situations persist.
 It has 3 levels of idling and allows configuring how long to stay at each 
level :
 
-  1. No Waiting - The first few times it will return immediately. This does 
not conserve any CPU. The number of times it remains in this state is 
configured using
-  `topology.bolt.wait.progressive.level1.count` or 
`topology.backpressure.wait.progressive.level1.count` depending which situation 
it is being used.
+  1. Level1 / No Waiting - The first few times it will return immediately. 
This does not conserve any CPU. The number of times it remains in this state is 
configured using
+  `topology.spout.wait.progressive.level1.count` or 
`topology.bolt.wait.progressive.level1.count` or 
`topology.backpressure.wait.progressive.level1.count` depending which
+  situation it is being used.
 
-  2. Park Nanos - In this state it disables the current thread for thread 
scheduling purposes, for 1 nano second using LockSupport.parkNanos(). This puts 
the CPU in a minimal
-  conservation state. It remains in this state for 
`topology.bolt.wait.progressive.level2.count` or 
`topology.backpressure.wait.progressive.level2.count` iterations.
+  2. Level 2 / Park Nanos - In this state it disables the current thread for 
thread scheduling purposes, for 1 nano second using LockSupport.parkNanos(). 
This puts the CPU in a minimal
+  conservation state. It remains in this state for 
`topology.spout.wait.progressive.level2.count` or 
`topology.bolt.wait.progressive.level2.count` or
+  `topology.backpressure.wait.progressive.level2.count` iterations.
 
-  3. Thread.sleep() - In this state it calls Thread.sleep() with the value 
specified in `topology.backpressure.wait.progressive.level3.sleep.millis` or in
-  `topology.bolt.wait.progressive.level3.sleep.millis` based on the Wait 
situation it is used in. This is the most CPU conserving level it remains in 
this level for
-  the remaining iterations.
+  3. Level 3 / Thread.sleep() - In this level it calls Thread.sleep() with the 
value specified in `topology.spout.wait.progressive.level3.sleep.millis` or
+  `topology.bolt.wait.progressive.level3.sleep.millis` or 
`topology.backpressure.wait.progressive.level3.sleep.millis`. This is the most 
CPU conserving level and it remains in
+  this level for the remaining iterations.
 
 
 - **ParkWaitStrategy** : This strategy can be used for Bolt Wait or 
Backpressure Wait situations. Set the strategy to 
`org.apache.storm.policy.WaitStrategyPark` to use this.

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index 84825cf..ab427d2 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -509,16 +509,6 @@ public class Config extends HashMap<String, Object> {
     public static final String 
TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
 
     /**
-     * A class that implements a strategy for what to do when a spout needs to 
wait. Waiting is
-     * triggered in one of two conditions:
-     *
-     * 1. nextTuple emits no tuples
-     * 2. The spout has hit maxSpoutPending and can't emit any more tuples
-     */
-    @isString
-    public static final String 
TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
-
-    /**
      * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
      */
     @isInteger
@@ -872,6 +862,46 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_ISOLATED_MACHINES = 
"topology.isolate.machines";
 
     /**
+     * A class that implements a wait strategy for spout. Waiting is triggered 
in one of two conditions:
+     *
+     * 1. nextTuple emits no tuples
+     * 2. The spout has hit maxSpoutPending and can't emit any more tuples
+     */
+    @isString
+    @isDerivedFrom(baseType = IWaitStrategy.class)
+    public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY = 
"topology.spout.wait.strategy";
+
+    /**
+     * Configures park time for WaitStrategyPark for spout.  If set to 0, 
returns immediately (i.e busy wait).
+     */
+    @NotNull
+    @isPositiveNumber(includeZero = true)
+    public static final String TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC = 
"topology.spout.wait.park.microsec";
+
+    /**
+     * Configures number of iterations to spend in level 1 of 
WaitStrategyProgressive, before progressing to level 2
+     */
+    @NotNull
+    @isInteger
+    @isPositiveNumber(includeZero = true)
+    public static final String TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT =  
"topology.spout.wait.progressive.level1.count";
+
+    /**
+     * Configures number of iterations to spend in level 2 of 
WaitStrategyProgressive, before progressing to level 3
+     */
+    @NotNull
+    @isInteger
+    @isPositiveNumber(includeZero = true)
+    public static final String TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT =  
"topology.spout.wait.progressive.level2.count";
+
+    /**
+     * Configures sleep time for WaitStrategyProgressive.
+     */
+    @NotNull
+    @isPositiveNumber(includeZero = true)
+    public static final String 
TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = 
"topology.spout.wait.progressive.level3.sleep.millis";
+
+    /**
      * Selects the Bolt's Wait Strategy to use when there are no incoming 
msgs. Used to trade off latency vs CPU usage.
      */
     @isString
@@ -890,7 +920,7 @@ public class Config extends HashMap<String, Object> {
      */
     @NotNull
     @isInteger
-    @isPositiveNumber
+    @isPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT =  
"topology.bolt.wait.progressive.level1.count";
 
     /**
@@ -898,7 +928,7 @@ public class Config extends HashMap<String, Object> {
      */
     @NotNull
     @isInteger
-    @isPositiveNumber
+    @isPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT =  
"topology.bolt.wait.progressive.level2.count";
 
     /**
@@ -939,7 +969,7 @@ public class Config extends HashMap<String, Object> {
      */
     @NotNull
     @isInteger
-    @isPositiveNumber
+    @isPositiveNumber(includeZero = true)
     public static final String 
TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT = 
"topology.backpressure.wait.progressive.level1.count";
 
     /**
@@ -947,7 +977,7 @@ public class Config extends HashMap<String, Object> {
      */
     @NotNull
     @isInteger
-    @isPositiveNumber
+    @isPositiveNumber(includeZero = true)
     public static final String 
TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT = 
"topology.backpressure.wait.progressive.level2.count";
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java 
b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 63ac1a5..61e6488 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -56,7 +56,6 @@ import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java 
b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 5f6773c..e204150 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -39,7 +39,6 @@ import org.apache.storm.hooks.info.SpoutFailInfo;
 import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
 import org.apache.storm.spout.ISpout;
-import org.apache.storm.spout.ISpoutWaitStrategy;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.stats.SpoutExecutorStats;
 import org.apache.storm.stats.StatsUtil;
@@ -65,7 +64,7 @@ public class SpoutExecutor extends Executor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
 
-    private final ISpoutWaitStrategy spoutWaitStrategy;
+    private final IWaitStrategy spoutWaitStrategy;
     private final IWaitStrategy backPressureWaitStrategy;
     private Integer maxSpoutPending;
     private final AtomicBoolean lastActive;
@@ -84,7 +83,7 @@ public class SpoutExecutor extends Executor {
     public SpoutExecutor(final WorkerState workerData, final List<Long> 
executorId, Map<String, String> credentials) {
         super(workerData, executorId, credentials, StatsUtil.SPOUT);
         this.spoutWaitStrategy = ReflectionUtils.newInstance((String) 
topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
-        this.spoutWaitStrategy.prepare(topoConf);
+        this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT);
         this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) 
topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
         this.backPressureWaitStrategy.prepare(topoConf, 
WAIT_SITUATION.BACK_PRESSURE_WAIT);
 
@@ -164,7 +163,8 @@ public class SpoutExecutor extends Executor {
         return new Callable<Long>() {
             int recvqCheckSkips = 0;
             final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
-            int bpIdleCount = 0;
+            int swIdleCount = 0; // counter for spout wait strategy
+            int bpIdleCount = 0; // counter for back pressure wait strategy
             int rmspCount = 0;
             @Override
             public Long call() throws Exception {
@@ -225,6 +225,7 @@ public class SpoutExecutor extends Executor {
                     spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
                     return 0L;
                 }
+                swIdleCount = 0;
                 return 0L;
             }
 
@@ -237,6 +238,19 @@ public class SpoutExecutor extends Executor {
                 
spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
             }
 
+            private void spoutWaitStrategy(boolean reachedMaxSpoutPending, 
long emptyStretch) throws InterruptedException {
+                emptyEmitStreak.increment();
+                long start = Time.currentTimeMillis();
+                swIdleCount = spoutWaitStrategy.idle(swIdleCount);
+                if (reachedMaxSpoutPending) {
+                    
spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
+                } else {
+                    if (emptyStretch > 0) {
+                        LOG.debug("Ending Spout Wait Stretch of {}", 
emptyStretch);
+                    }
+                }
+            }
+
             // returns true if pendingEmits is empty
             private boolean tryFlushPendingEmits() {
                 for (AddressedTuple t = pendingEmits.peek(); t != null; t = 
pendingEmits.peek()) {
@@ -251,19 +265,6 @@ public class SpoutExecutor extends Executor {
         };
     }
 
-    private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long 
emptyStretch) {
-        emptyEmitStreak.increment();
-        long start = Time.currentTimeMillis();
-        spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
-        if (reachedMaxSpoutPending) {
-            spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() 
- start);
-        } else {
-            if (emptyStretch > 0) {
-                LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
-            }
-        }
-    }
-
     private void activateSpouts() {
         LOG.info("Activating spout {}:{}", componentId, taskIds);
         for (ISpout spout : spouts) {

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java 
b/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
index f41492e..907591b 100644
--- a/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
+++ b/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
@@ -25,7 +25,7 @@ import java.util.Map;
 
 
 public interface IWaitStrategy {
-    enum WAIT_SITUATION {BOLT_WAIT, BACK_PRESSURE_WAIT}
+    enum WAIT_SITUATION {SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT}
 
     void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java 
b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
index 0406fd2..717a02e 100644
--- a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
+++ b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
@@ -29,7 +29,9 @@ public class WaitStrategyPark implements IWaitStrategy {
 
     @Override
     public void prepare(Map<String, Object> conf, WAIT_SITUATION 
waitSituation) {
-        if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
+        if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {
+            parkTimeNanoSec = 1_000 * 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC));
+        } else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
             parkTimeNanoSec = 1_000 * 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC));
         } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
             parkTimeNanoSec = 1_000 * 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC));

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java 
b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
index 067ca71..76ad4f9 100644
--- a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
+++ b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
@@ -44,7 +44,11 @@ public class WaitStrategyProgressive implements 
IWaitStrategy {
 
     @Override
     public void prepare(Map<String, Object> conf, WAIT_SITUATION 
waitSituation) {
-        if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
+        if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {
+            level1Count   = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
+            level2Count   = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
+            level3SleepMs = 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
+        } else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
             level1Count   = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
             level2Count   = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
             level3SleepMs = 
ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
@@ -61,7 +65,7 @@ public class WaitStrategyProgressive implements IWaitStrategy 
{
     public int idle(int idleCounter) throws InterruptedException {
         if (idleCounter < level1Count) {                     // level 1 - no 
waiting
             ++idleCounter;
-        } else if (idleCounter < level1Count * level2Count) { // level 2 - 
parkNanos(1L)
+        } else if (idleCounter < level1Count + level2Count) { // level 2 - 
parkNanos(1L)
             ++idleCounter;
             LockSupport.parkNanos(1L);
         } else {                                      // level 3 - longer 
idling with Thread.sleep()

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/spout/ISpoutWaitStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/spout/ISpoutWaitStrategy.java 
b/storm-client/src/jvm/org/apache/storm/spout/ISpoutWaitStrategy.java
deleted file mode 100644
index 3726f12..0000000
--- a/storm-client/src/jvm/org/apache/storm/spout/ISpoutWaitStrategy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.spout;
-
-import java.util.Map;
-
-/**
- * The strategy a spout needs to use when its waiting. Waiting is
- * triggered in one of two conditions:
- * 
- * 1. `nextTuple()` emits no tuples
- * 2. The spout has hit maxSpoutPending and can't emit any more tuples
- * 
- * The default strategy sleeps for one millisecond.
- */
-public interface ISpoutWaitStrategy {
-    void prepare(Map<String, Object> conf);
-    void emptyEmit(long streak);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/spout/NothingEmptyEmitStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/spout/NothingEmptyEmitStrategy.java 
b/storm-client/src/jvm/org/apache/storm/spout/NothingEmptyEmitStrategy.java
deleted file mode 100644
index cc1f1eb..0000000
--- a/storm-client/src/jvm/org/apache/storm/spout/NothingEmptyEmitStrategy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.spout;
-
-import java.util.Map;
-
-public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy {
-    @Override
-    public void emptyEmit(long streak) {        
-    }
-
-    @Override
-    public void prepare(Map<String, Object> conf) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java 
b/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java
deleted file mode 100644
index c7826f9..0000000
--- a/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.spout;
-
-import org.apache.storm.Config;
-import java.util.Map;
-
-
-public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy {
-
-    long sleepMillis;
-    
-    @Override
-    public void prepare(Map<String, Object> conf) {
-        sleepMillis = ((Number) 
conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
-    }
-
-    @Override
-    public void emptyEmit(long streak) {
-        try {
-            Thread.sleep(sleepMillis);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

Reply via email to