Repository: storm Updated Branches: refs/heads/master 4721c3617 -> dc4011cd6
STORM-2958 - Use new wait strategy model for Spout as well Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51997889 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51997889 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51997889 Branch: refs/heads/master Commit: 51997889492ab9ebe68f495a232c239c33b31d9f Parents: 4721c36 Author: Roshan Naik <ros...@hw13642.hsd1.ca.comcast.net> Authored: Mon Feb 19 16:18:13 2018 -0800 Committer: Roshan Naik <ros...@hw13642.hsd1.ca.comcast.net> Committed: Mon Feb 19 16:18:31 2018 -0800 ---------------------------------------------------------------------- conf/defaults.yaml | 8 ++- docs/Performance.md | 27 ++++----- .../src/jvm/org/apache/storm/Config.java | 58 +++++++++++++++----- .../storm/executor/bolt/BoltExecutor.java | 1 - .../storm/executor/spout/SpoutExecutor.java | 35 ++++++------ .../org/apache/storm/policy/IWaitStrategy.java | 2 +- .../apache/storm/policy/WaitStrategyPark.java | 4 +- .../storm/policy/WaitStrategyProgressive.java | 8 ++- .../apache/storm/spout/ISpoutWaitStrategy.java | 34 ------------ .../storm/spout/NothingEmptyEmitStrategy.java | 31 ----------- .../storm/spout/SleepSpoutWaitStrategy.java | 41 -------------- 11 files changed, 92 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 6269282..d64623e 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -242,8 +242,12 @@ topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 # Spout Wait Strategy - employed when there is no data to produce -topology.spout.wait.strategy: "org.apache.storm.spout.SleepSpoutWaitStrategy" -topology.sleep.spout.wait.strategy.time.ms: 1 +topology.spout.wait.strategy: "org.apache.storm.policy.WaitStrategyProgressive" +topology.spout.wait.park.microsec : 100 # park time for org.apache.storm.policy.WaitStrategyPark. Busy spins if set to 0. + +topology.spout.wait.progressive.level1.count: 0 # number of iterations to spend in level 1 [no sleep] of WaitStrategyProgressive, before progressing to level 2 +topology.spout.wait.progressive.level2.count: 0 # number of iterations to spend in level 2 [parkNanos(1)] of WaitStrategyProgressive, before progressing to level 3 +topology.spout.wait.progressive.level3.sleep.millis: 1 # sleep duration for idling iterations in level 3 of WaitStrategyProgressive # Bolt Wait Strategy - employed when there is no data in its receive buffer to process topology.bolt.wait.strategy : "org.apache.storm.policy.WaitStrategyProgressive" http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/docs/Performance.md ---------------------------------------------------------------------- diff --git a/docs/Performance.md b/docs/Performance.md index 6d7d33e..20acb32 100644 --- a/docs/Performance.md +++ b/docs/Performance.md @@ -90,8 +90,10 @@ values. ## 4. Wait Strategy Wait strategies are used to conserve CPU usage by trading off some latency and throughput. They are applied for the following situations: -4.1 **Spout Wait:** In low/no traffic situations, Spout's nextTuple() may not produce any new emits. To prevent invoking the Spout's nextTuple, -this wait strategy is used between nextTuple() calls to allow the spout's executor thread to idle and conserve CPU. Select a strategy using `topology.spout.wait.strategy`. +4.1 **Spout Wait:** In low/no traffic situations, Spout's nextTuple() may not produce any new emits. To prevent invoking the Spout's nextTuple too often, +this wait strategy is used between nextTuple() calls, allowing the spout's executor thread to idle and conserve CPU. Spout wait strategy is also used +when the `topology.max.spout.pending` limit has been reached when ACKers are enabled. Select a strategy using `topology.spout.wait.strategy`. Configure the +chosen wait strategy using one of the `topology.spout.wait.*` settings. 4.2 **Bolt Wait:** : When a bolt polls it's receive queue for new messages to process, it is possible that the queue is empty. This typically happens in case of low/no traffic situations or when the upstream spout/bolt is inherently slower. This wait strategy is used in such cases. It avoids high CPU usage @@ -104,24 +106,23 @@ conserving CPU. The chosen strategy can be further configured using the `topolog #### Built-in wait strategies: - -- **SleepSpoutWaitStrategy** : This is the only built-in strategy available for Spout Wait. It cannot be applied to other Wait situations. It is a simple static strategy that -calls Thread.sleep() each time. Set `topology.spout.wait.strategy` to `org.apache.storm.spout.SleepSpoutWaitStrategy` for using this. `topology.sleep.spout.wait.strategy.time.ms` -configures the sleep time. +These wait strategies are availabe for use with all of the above mentioned wait situations. - **ProgressiveWaitStrategy** : This strategy can be used for Bolt Wait or Backpressure Wait situations. Set the strategy to 'org.apache.storm.policy.WaitStrategyProgressive' to select this wait strategy. This is a dynamic wait strategy that enters into progressively deeper states of CPU conservation if the Backpressure Wait or Bolt Wait situations persist. It has 3 levels of idling and allows configuring how long to stay at each level : - 1. No Waiting - The first few times it will return immediately. This does not conserve any CPU. The number of times it remains in this state is configured using - `topology.bolt.wait.progressive.level1.count` or `topology.backpressure.wait.progressive.level1.count` depending which situation it is being used. + 1. Level1 / No Waiting - The first few times it will return immediately. This does not conserve any CPU. The number of times it remains in this state is configured using + `topology.spout.wait.progressive.level1.count` or `topology.bolt.wait.progressive.level1.count` or `topology.backpressure.wait.progressive.level1.count` depending which + situation it is being used. - 2. Park Nanos - In this state it disables the current thread for thread scheduling purposes, for 1 nano second using LockSupport.parkNanos(). This puts the CPU in a minimal - conservation state. It remains in this state for `topology.bolt.wait.progressive.level2.count` or `topology.backpressure.wait.progressive.level2.count` iterations. + 2. Level 2 / Park Nanos - In this state it disables the current thread for thread scheduling purposes, for 1 nano second using LockSupport.parkNanos(). This puts the CPU in a minimal + conservation state. It remains in this state for `topology.spout.wait.progressive.level2.count` or `topology.bolt.wait.progressive.level2.count` or + `topology.backpressure.wait.progressive.level2.count` iterations. - 3. Thread.sleep() - In this state it calls Thread.sleep() with the value specified in `topology.backpressure.wait.progressive.level3.sleep.millis` or in - `topology.bolt.wait.progressive.level3.sleep.millis` based on the Wait situation it is used in. This is the most CPU conserving level it remains in this level for - the remaining iterations. + 3. Level 3 / Thread.sleep() - In this level it calls Thread.sleep() with the value specified in `topology.spout.wait.progressive.level3.sleep.millis` or + `topology.bolt.wait.progressive.level3.sleep.millis` or `topology.backpressure.wait.progressive.level3.sleep.millis`. This is the most CPU conserving level and it remains in + this level for the remaining iterations. - **ParkWaitStrategy** : This strategy can be used for Bolt Wait or Backpressure Wait situations. Set the strategy to `org.apache.storm.policy.WaitStrategyPark` to use this. http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 84825cf..ab427d2 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -509,16 +509,6 @@ public class Config extends HashMap<String, Object> { public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; /** - * A class that implements a strategy for what to do when a spout needs to wait. Waiting is - * triggered in one of two conditions: - * - * 1. nextTuple emits no tuples - * 2. The spout has hit maxSpoutPending and can't emit any more tuples - */ - @isString - public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy"; - - /** * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for. */ @isInteger @@ -872,6 +862,46 @@ public class Config extends HashMap<String, Object> { public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines"; /** + * A class that implements a wait strategy for spout. Waiting is triggered in one of two conditions: + * + * 1. nextTuple emits no tuples + * 2. The spout has hit maxSpoutPending and can't emit any more tuples + */ + @isString + @isDerivedFrom(baseType = IWaitStrategy.class) + public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY = "topology.spout.wait.strategy"; + + /** + * Configures park time for WaitStrategyPark for spout. If set to 0, returns immediately (i.e busy wait). + */ + @NotNull + @isPositiveNumber(includeZero = true) + public static final String TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC = "topology.spout.wait.park.microsec"; + + /** + * Configures number of iterations to spend in level 1 of WaitStrategyProgressive, before progressing to level 2 + */ + @NotNull + @isInteger + @isPositiveNumber(includeZero = true) + public static final String TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT = "topology.spout.wait.progressive.level1.count"; + + /** + * Configures number of iterations to spend in level 2 of WaitStrategyProgressive, before progressing to level 3 + */ + @NotNull + @isInteger + @isPositiveNumber(includeZero = true) + public static final String TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT = "topology.spout.wait.progressive.level2.count"; + + /** + * Configures sleep time for WaitStrategyProgressive. + */ + @NotNull + @isPositiveNumber(includeZero = true) + public static final String TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = "topology.spout.wait.progressive.level3.sleep.millis"; + + /** * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage. */ @isString @@ -890,7 +920,7 @@ public class Config extends HashMap<String, Object> { */ @NotNull @isInteger - @isPositiveNumber + @isPositiveNumber(includeZero = true) public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT = "topology.bolt.wait.progressive.level1.count"; /** @@ -898,7 +928,7 @@ public class Config extends HashMap<String, Object> { */ @NotNull @isInteger - @isPositiveNumber + @isPositiveNumber(includeZero = true) public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT = "topology.bolt.wait.progressive.level2.count"; /** @@ -939,7 +969,7 @@ public class Config extends HashMap<String, Object> { */ @NotNull @isInteger - @isPositiveNumber + @isPositiveNumber(includeZero = true) public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT = "topology.backpressure.wait.progressive.level1.count"; /** @@ -947,7 +977,7 @@ public class Config extends HashMap<String, Object> { */ @NotNull @isInteger - @isPositiveNumber + @isPositiveNumber(includeZero = true) public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT = "topology.backpressure.wait.progressive.level2.count"; http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java index 63ac1a5..61e6488 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java @@ -56,7 +56,6 @@ import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ReflectionUtils; import org.apache.storm.utils.Utils; import org.apache.storm.utils.Time; -import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java index 5f6773c..e204150 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java @@ -39,7 +39,6 @@ import org.apache.storm.hooks.info.SpoutFailInfo; import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION; import org.apache.storm.spout.ISpout; -import org.apache.storm.spout.ISpoutWaitStrategy; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.stats.SpoutExecutorStats; import org.apache.storm.stats.StatsUtil; @@ -65,7 +64,7 @@ public class SpoutExecutor extends Executor { private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); - private final ISpoutWaitStrategy spoutWaitStrategy; + private final IWaitStrategy spoutWaitStrategy; private final IWaitStrategy backPressureWaitStrategy; private Integer maxSpoutPending; private final AtomicBoolean lastActive; @@ -84,7 +83,7 @@ public class SpoutExecutor extends Executor { public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) { super(workerData, executorId, credentials, StatsUtil.SPOUT); this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); - this.spoutWaitStrategy.prepare(topoConf); + this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT); this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY)); this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); @@ -164,7 +163,8 @@ public class SpoutExecutor extends Executor { return new Callable<Long>() { int recvqCheckSkips = 0; final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount(); - int bpIdleCount = 0; + int swIdleCount = 0; // counter for spout wait strategy + int bpIdleCount = 0; // counter for back pressure wait strategy int rmspCount = 0; @Override public Long call() throws Exception { @@ -225,6 +225,7 @@ public class SpoutExecutor extends Executor { spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch); return 0L; } + swIdleCount = 0; return 0L; } @@ -237,6 +238,19 @@ public class SpoutExecutor extends Executor { spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start); } + private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException { + emptyEmitStreak.increment(); + long start = Time.currentTimeMillis(); + swIdleCount = spoutWaitStrategy.idle(swIdleCount); + if (reachedMaxSpoutPending) { + spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start); + } else { + if (emptyStretch > 0) { + LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch); + } + } + } + // returns true if pendingEmits is empty private boolean tryFlushPendingEmits() { for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) { @@ -251,19 +265,6 @@ public class SpoutExecutor extends Executor { }; } - private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) { - emptyEmitStreak.increment(); - long start = Time.currentTimeMillis(); - spoutWaitStrategy.emptyEmit(emptyEmitStreak.get()); - if (reachedMaxSpoutPending) { - spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start); - } else { - if (emptyStretch > 0) { - LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch); - } - } - } - private void activateSpouts() { LOG.info("Activating spout {}:{}", componentId, taskIds); for (ISpout spout : spouts) { http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java b/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java index f41492e..907591b 100644 --- a/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java +++ b/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java @@ -25,7 +25,7 @@ import java.util.Map; public interface IWaitStrategy { - enum WAIT_SITUATION {BOLT_WAIT, BACK_PRESSURE_WAIT} + enum WAIT_SITUATION {SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT} void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation); http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java index 0406fd2..717a02e 100644 --- a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java +++ b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java @@ -29,7 +29,9 @@ public class WaitStrategyPark implements IWaitStrategy { @Override public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) { - if (waitSituation == WAIT_SITUATION.BOLT_WAIT) { + if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) { + parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC)); + } else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) { parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC)); } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) { parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC)); http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java index 067ca71..76ad4f9 100644 --- a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java +++ b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java @@ -44,7 +44,11 @@ public class WaitStrategyProgressive implements IWaitStrategy { @Override public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) { - if (waitSituation == WAIT_SITUATION.BOLT_WAIT) { + if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) { + level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT)); + level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT)); + level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS)); + } else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) { level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT)); level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT)); level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS)); @@ -61,7 +65,7 @@ public class WaitStrategyProgressive implements IWaitStrategy { public int idle(int idleCounter) throws InterruptedException { if (idleCounter < level1Count) { // level 1 - no waiting ++idleCounter; - } else if (idleCounter < level1Count * level2Count) { // level 2 - parkNanos(1L) + } else if (idleCounter < level1Count + level2Count) { // level 2 - parkNanos(1L) ++idleCounter; LockSupport.parkNanos(1L); } else { // level 3 - longer idling with Thread.sleep() http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/spout/ISpoutWaitStrategy.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/spout/ISpoutWaitStrategy.java b/storm-client/src/jvm/org/apache/storm/spout/ISpoutWaitStrategy.java deleted file mode 100644 index 3726f12..0000000 --- a/storm-client/src/jvm/org/apache/storm/spout/ISpoutWaitStrategy.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.spout; - -import java.util.Map; - -/** - * The strategy a spout needs to use when its waiting. Waiting is - * triggered in one of two conditions: - * - * 1. `nextTuple()` emits no tuples - * 2. The spout has hit maxSpoutPending and can't emit any more tuples - * - * The default strategy sleeps for one millisecond. - */ -public interface ISpoutWaitStrategy { - void prepare(Map<String, Object> conf); - void emptyEmit(long streak); -} http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/spout/NothingEmptyEmitStrategy.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/spout/NothingEmptyEmitStrategy.java b/storm-client/src/jvm/org/apache/storm/spout/NothingEmptyEmitStrategy.java deleted file mode 100644 index cc1f1eb..0000000 --- a/storm-client/src/jvm/org/apache/storm/spout/NothingEmptyEmitStrategy.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.spout; - -import java.util.Map; - -public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy { - @Override - public void emptyEmit(long streak) { - } - - @Override - public void prepare(Map<String, Object> conf) { - throw new UnsupportedOperationException("Not supported yet."); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/51997889/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java b/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java deleted file mode 100644 index c7826f9..0000000 --- a/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.spout; - -import org.apache.storm.Config; -import java.util.Map; - - -public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy { - - long sleepMillis; - - @Override - public void prepare(Map<String, Object> conf) { - sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue(); - } - - @Override - public void emptyEmit(long streak) { - try { - Thread.sleep(sleepMillis); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } -}