Thanks stig, this is very helpful. So that call function gets called in a loop 
from somewhere? And when there is an ack or fail does do those get handled 
instead of call? Would you be able to point me toward the source for that as 
well? Just trying to understand how things work. 

Thanks again!  


> On Aug 15, 2017, at 14:48, Stig Rohde Døssing <s...@apache.org> wrote:
> 
> Sure, take a look at 
> https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L140.
>  This function is called repeatedly on spouts to emit new tuples. The wait 
> strategy is used in L175 when a call to nextTuple doesn't emit anything. The 
> wait strategy is instantiated here 
> https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L73.
>  Note that this is linking to the current master code, the 1.x code is 
> Clojure code instead. The equivalent on 1.x is here 
> https://github.com/apache/storm/blob/v1.1.1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L659.
> 
> I believe you have to do it in code for Java-based topology configurations, 
> but you should take a look at 
> http://storm.apache.org/releases/2.0.0-SNAPSHOT/flux.html, which allows you 
> to specify topology configuration as yaml.
> 
> 2017-08-15 20:36 GMT+02:00 Mahak Goel <mahakgoe...@gmail.com>:
>> Also there's no config file that can do something similar right? It has to 
>> be done in the code? 
>> 
>> 
>>> On Aug 15, 2017, at 14:31, Mahak Goel <mahakgoe...@gmail.com> wrote:
>>> 
>>> Thanks stig, that worked for me! 
>>> 
>>> Another question, how does storm internally handle this time out? Is there 
>>> some source code you can point me to? 
>>> 
>>> Sent from my iPhone
>>> 
>>>> On Aug 15, 2017, at 12:15, Stig Rohde Døssing <s...@apache.org> wrote:
>>>> 
>>>> I think you need to give the FQCN for SleepSpoutWaitStrategy instead of an 
>>>> instance, since the config must be serializable to JSON, a little 
>>>> surprised you don't get an error when you submit that topology. If you're 
>>>> using the default wait strategy, you can just leave out the 
>>>> TOPOLOGY_SPOUT_WAIT_STRATEGY part.
>>>> 
>>>> Here's what works for me (based on the word count topology in 
>>>> storm-starter):
>>>> 
>>>> builder.setSpout("spout", new RandomSentenceSpout(), 5)
>>>>         .addConfiguration(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY, new 
>>>> TestWait().getClass().getName())
>>>>         
>>>> .addConfiguration(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 
>>>> 60_000);
>>>> 
>>>> where TestWait is just an inner class like this (purely so I can print the 
>>>> configuration, normally I'd just use the built in wait strategy)
>>>> 
>>>> public static final class TestWait extends SleepSpoutWaitStrategy {
>>>> 
>>>>         @Override
>>>>         public void prepare(Map<String, Object> conf) {
>>>>             super.prepare(conf);
>>>>             LogManager.getLogger(getClass()).error("The sleep backoff is 
>>>> {}", conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS));
>>>>         }
>>>>       
>>>>   }
>>>> 
>>>> When I run the topology I get the following in the log:
>>>> 2017-08-15 18:11:56.596 o.a.s.s.WordCountTopology$TestWait main [ERROR] 
>>>> The sleep backoff is 60000
>>>> 
>>>> 2017-08-15 18:00 GMT+02:00 Mahak Goel <mahakgoe...@gmail.com>:
>>>>> In the last line I use addConfigurations
>>>>> 
>>>>> Sent from my iPhone
>>>>> 
>>>>>> On Aug 15, 2017, at 11:59, Mahak Goel <mahakgoe...@gmail.com> wrote:
>>>>>> 
>>>>>> Hmm okay, that's what I'm trying to do but maybe I'm doing it wrong. 
>>>>>> 
>>>>>> 
>>>>>> Config config = new Config();
>>>>>> SleepSpoutWaitStrategy strategy = new SleepSpoutWaitStrategy();
>>>>>> config.put(org.apache.storm.Config.TOPOLOGY_SPOUT_WAIT_STRATEGY, 
>>>>>> strategy);
>>>>>> config.put(org.apache.storm.Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS,
>>>>>>  10);
>>>>>> builder.setSpout(...).addConfiguration(config);
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Sent from my iPhone
>>>>>> On Aug 15, 2017, at 11:51, Stig Rohde Døssing <s...@apache.org> wrote:
>>>>>> 
>>>>>>> I think I might have misread the code. It looks like the method I 
>>>>>>> linked does the opposite of what I thought, and removes only the 
>>>>>>> configuration that is not listed in the link. I would expect using 
>>>>>>> SpoutDeclarer.addConfiguration to work then.
>>>>>>> 
>>>>>>> 2017-08-15 17:36 GMT+02:00 Mahak Goel <mahakgoe...@gmail.com>:
>>>>>>>> Text from post. 
>>>>>>>> 
>>>>>>>> 2. Spout wait strategies: There's two situations in which a spout 
>>>>>>>> needs to wait. The first is when the max spout pending limit is 
>>>>>>>> reached. The second is when nothing is emitted from nextTuple. 
>>>>>>>> Previously, Storm would just have that spout sit in a busy loop in 
>>>>>>>> those cases. What Storm does in those situations is now pluggable, and 
>>>>>>>> the default is now for the spout to sleep for 1 ms. This will cause 
>>>>>>>> the spout to use dramatically less CPU when it hits those cases, and 
>>>>>>>> it also obviates the need for spouts to do any sleeping in their 
>>>>>>>> implementation to be "polite". The wait strategy can be configured 
>>>>>>>> with TOPOLOGY_SPOUT_WAIT_STRATEGY and can be configured on a spout by 
>>>>>>>> spout basis. The interface to implement for a wait strategy is 
>>>>>>>> backtype.storm.spout.ISpoutWaitStrategy
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Aug 15, 2017, at 11:34, Mahak Goel <mahakgoe...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>> I tried adding TOPOLOGY_SPOUT_WAIT_STRATEGY  and 
>>>>>>>>> TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS in the spouts config but 
>>>>>>>>> that didn't seem to have an effect. 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Aug 15, 2017, at 11:28, Mahak Goel <mahakgoe...@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi Stig, 
>>>>>>>>>> 
>>>>>>>>>> Thank you. However it looks like from this post there is a way to do 
>>>>>>>>>> it on a per spout basis. 
>>>>>>>>>> https://groups.google.com/forum/m/#!search/Storm$200.8.1$20released/storm-user/hVbXtBdCkQo
>>>>>>>>>> 
>>>>>>>>>> Do you or does anyone else know if this is still a possibility? If 
>>>>>>>>>> so, how do I do it? 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> On Aug 15, 2017, at 11:14, Stig Rohde Døssing <s...@apache.org> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Mahak,
>>>>>>>>>>> 
>>>>>>>>>>> I haven't checked in any detail, but I suspect there isn't. I'd 
>>>>>>>>>>> have said you could set the configuration for the spout via the 
>>>>>>>>>>> SpoutDeclarer addConfiguration methods when declaring the spout, 
>>>>>>>>>>> but it looks like the wait strategy and backoff are both removed 
>>>>>>>>>>> from the component configuration, and only read from the topology 
>>>>>>>>>>> level configuration 
>>>>>>>>>>> https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/executor/Executor.java#L431.
>>>>>>>>>>>  
>>>>>>>>>>> 
>>>>>>>>>>> 2017-08-15 16:45 GMT+02:00 Brian Taylor 
>>>>>>>>>>> <br...@resolvingarchitecture.com>:
>>>>>>>>>>>> Unsubscribe
>>>>>>>>>>>> 
>>>>>>>>>>>> Sent from BlueMail
>>>>>>>>>>>>> On Aug 15, 2017, at 10:34 AM, Mahak Goel <mahakgoe...@gmail.com> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I know I can configure a sleep wait strategy in the defaults.yaml 
>>>>>>>>>>>>> and that will apply to all spouts in the topology. Is there a way 
>>>>>>>>>>>>> to do this on a spout by spout basis? That is, is there a way to 
>>>>>>>>>>>>> configure different times for different spouts? 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>> 
>>>>>>> 
>>>> 
> 

Reply via email to