Sure. When the supervisor starts up a worker JVM, the worker process boots up its set of executors here https://github.com/apache/storm/blob/7f33447477dfbf581e9b46feb27c362cc170dc56/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java#L202 (note the main method at the bottom of the file, this is the entry point for worker processes). The loop calling SpoutExecutor.call is here https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/Executor.java#L239, which basically just sets up a java Thread to keep calling the call method until a crash or interrupt happens.
The acks or fails are handled as part of call here https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L141. That statement pulls messages off the executor's message queue, and calls back to https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L191, which is where acks and fails are handled. Once the messages are handled, control returns to the call method. This might be helpful as a reference too http://storm.apache.org/releases/2.0.0-SNAPSHOT/Understanding-the-parallelism-of-a-Storm-topology.html 2017-08-15 22:02 GMT+02:00 Mahak Goel <[email protected]>: > Thanks stig, this is very helpful. So that call function gets called in a > loop from somewhere? And when there is an ack or fail does do those get > handled instead of call? Would you be able to point me toward the source > for that as well? Just trying to understand how things work. > > Thanks again! > > > On Aug 15, 2017, at 14:48, Stig Rohde Døssing <[email protected]> wrote: > > Sure, take a look at https://github.com/apache/storm/blob/ > 90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/ > jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L140. This > function is called repeatedly on spouts to emit new tuples. The wait > strategy is used in L175 when a call to nextTuple doesn't emit anything. > The wait strategy is instantiated here https://github.com/apache/ > storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client/src/ > jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L73. Note that > this is linking to the current master code, the 1.x code is Clojure code > instead. The equivalent on 1.x is here https://github.com/apache/ > storm/blob/v1.1.1/storm-core/src/clj/org/apache/storm/ > daemon/executor.clj#L659. > > I believe you have to do it in code for Java-based topology > configurations, but you should take a look at http://storm.apache.org/ > releases/2.0.0-SNAPSHOT/flux.html, which allows you to specify topology > configuration as yaml. > > 2017-08-15 20:36 GMT+02:00 Mahak Goel <[email protected]>: > >> Also there's no config file that can do something similar right? It has >> to be done in the code? >> >> >> On Aug 15, 2017, at 14:31, Mahak Goel <[email protected]> wrote: >> >> Thanks stig, that worked for me! >> >> Another question, how does storm internally handle this time out? Is >> there some source code you can point me to? >> >> Sent from my iPhone >> >> On Aug 15, 2017, at 12:15, Stig Rohde Døssing <[email protected]> wrote: >> >> I think you need to give the FQCN for SleepSpoutWaitStrategy instead of >> an instance, since the config must be serializable to JSON, a little >> surprised you don't get an error when you submit that topology. If you're >> using the default wait strategy, you can just leave out the >> TOPOLOGY_SPOUT_WAIT_STRATEGY part. >> >> Here's what works for me (based on the word count topology in >> storm-starter): >> >> builder.setSpout("spout", new RandomSentenceSpout(), 5) >> .addConfiguration(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY, new >> TestWait().getClass().getName()) >> .addConfiguration(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, >> 60_000); >> >> where TestWait is just an inner class like this (purely so I can print >> the configuration, normally I'd just use the built in wait strategy) >> >> public static final class TestWait extends SleepSpoutWaitStrategy { >> >> @Override >> public void prepare(Map<String, Object> conf) { >> super.prepare(conf); >> LogManager.getLogger(getClass()).error("The sleep backoff is >> {}", conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)); >> } >> >> } >> >> When I run the topology I get the following in the log: >> 2017-08-15 18:11:56.596 o.a.s.s.WordCountTopology$TestWait main [ERROR] >> The sleep backoff is 60000 >> >> 2017-08-15 18:00 GMT+02:00 Mahak Goel <[email protected]>: >> >>> In the last line I use addConfigurations >>> >>> Sent from my iPhone >>> >>> On Aug 15, 2017, at 11:59, Mahak Goel <[email protected]> wrote: >>> >>> Hmm okay, that's what I'm trying to do but maybe I'm doing it wrong. >>> >>> >>> Config config = new Config(); >>> SleepSpoutWaitStrategy strategy = new SleepSpoutWaitStrategy(); >>> config.put(org.apache.storm.Config.TOPOLOGY_SPOUT_WAIT_STRATEGY, >>> strategy); >>> config.put(org.apache.storm.Config.TOPOLOGY_SLEEP_SPOUT_WAIT >>> _STRATEGY_TIME_MS, 10); >>> builder.setSpout(...).addConfiguration(config); >>> >>> >>> >>> Sent from my iPhone >>> On Aug 15, 2017, at 11:51, Stig Rohde Døssing <[email protected]> wrote: >>> >>> I think I might have misread the code. It looks like the method I linked >>> does the opposite of what I thought, and removes only the configuration >>> that is not listed in the link. I would expect using >>> SpoutDeclarer.addConfiguration to work then. >>> >>> 2017-08-15 17:36 GMT+02:00 Mahak Goel <[email protected]>: >>> >>>> Text from post. >>>> >>>> 2. Spout wait strategies: There's two situations in which a spout needs >>>> to wait. The first is when the max spout pending limit is reached. The >>>> second is when nothing is emitted from nextTuple. Previously, Storm would >>>> just have that spout sit in a busy loop in those cases. What Storm does in >>>> those situations is now pluggable, and the default is now for the spout to >>>> sleep for 1 ms. This will cause the spout to use dramatically less CPU when >>>> it hits those cases, and it also obviates the need for spouts to do any >>>> sleeping in their implementation to be "polite". The wait strategy can be >>>> configured with TOPOLOGY_SPOUT_WAIT_STRATEGY and can be configured on >>>> a spout by spout basis. The interface to implement for a wait strategy is >>>> backtype.storm.spout.ISpoutWaitStrategy >>>> >>>> >>>> >>>> On Aug 15, 2017, at 11:34, Mahak Goel <[email protected]> wrote: >>>> >>>> I tried adding TOPOLOGY_SPOUT_WAIT_STRATEGY and >>>> TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS in the spouts config but >>>> that didn't seem to have an effect. >>>> >>>> >>>> On Aug 15, 2017, at 11:28, Mahak Goel <[email protected]> wrote: >>>> >>>> Hi Stig, >>>> >>>> Thank you. However it looks like from this post there is a way to do it >>>> on a per spout basis. >>>> https://groups.google.com/forum/m/#!search/Storm$200.8.1$20r >>>> eleased/storm-user/hVbXtBdCkQo >>>> >>>> Do you or does anyone else know if this is still a possibility? If so, >>>> how do I do it? >>>> >>>> >>>> On Aug 15, 2017, at 11:14, Stig Rohde Døssing <[email protected]> wrote: >>>> >>>> Hi Mahak, >>>> >>>> I haven't checked in any detail, but I suspect there isn't. I'd have >>>> said you could set the configuration for the spout via the SpoutDeclarer >>>> addConfiguration methods when declaring the spout, but it looks like the >>>> wait strategy and backoff are both removed from the component >>>> configuration, and only read from the topology level configuration >>>> https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f3 >>>> 3ab64e200345333e4/storm-client/src/jvm/org/apache/storm/exec >>>> utor/Executor.java#L431. >>>> >>>> 2017-08-15 16:45 GMT+02:00 Brian Taylor <[email protected] >>>> om>: >>>> >>>>> Unsubscribe >>>>> >>>>> Sent from BlueMail <http://www.bluemail.me/r?b=9660> >>>>> On Aug 15, 2017, at 10:34 AM, Mahak Goel <[email protected]> >>>>> wrote: >>>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> I know I can configure a sleep wait strategy in the defaults.yaml and >>>>>> that will apply to all spouts in the topology. Is there a way to do this >>>>>> on a spout by spout basis? That is, is there a way to configure >>>>>> different times for different spouts? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>> >>> >> >
