How about: https://storm.incubator.apache.org/apidocs/backtype/storm/Config.html#NIMBUS_TASK_TIMEOUT_SECS
"How long without heartbeating a task can go before nimbus will consider the task dead and reassign it to another location." and https://storm.incubator.apache.org/apidocs/backtype/storm/Config.html#NIMBUS_REASSIGN "Whether or not nimbus should reassign tasks if it detects that a task goes down. Defaults to true, and it's not recommended to change this value." Marc On Sat, Oct 11, 2014 at 08:42:16PM +0000, Itai Frenkel wrote: > AFAIK the task.heartbeat.frequency.secs is related to zookeeper bookeeping. > Not sure if it is being act upon. > > See also https://github.com/apache/storm/pull/286 (related to 2.3 below). > > Itai > > ________________________________________ > From: Marc Vaillant <[email protected]> > Sent: Friday, October 10, 2014 7:32 PM > To: [email protected] > Subject: Re: Why won't storm restart failing tasks? > > Hi Itai, > > Thanks very much for your response and suggestions. My first naive > thought is: why is this so hard? Isn't this the most basic fault > tolerance provided by the simplest load balancers (which is essentially > what a shuffle grouping is)? Second question is that I see config > parameters like task.heartbeat.frequency.secs, what is the purpose of a > task heartbeat if not to detect this kind of situation? > > Thanks, > Marc > > On Fri, Oct 10, 2014 at 09:22:38AM +0000, Itai Frenkel wrote: > > Hi Marc, > > > > (I'm a storm newbie myself). > > > > As I understand it your question has two parts: > > 1. Can Storm detect a Bolt that is "stuck". AFAIK the answer is no. > > 2. Can Storm do anything about it. AFAIK the answer is no. > > > > What I would do in your situation: > > 1. I would write a delegate bolt that would delegate all the > > execute/ack/fail to "problematic bolt". > > I would then connect to that delegate bolt a tick tuple > > http://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/ which would > > update a TTL data structure > > (such as expireAfterWrite and removeListener as described in > > http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html > > ) > > Each time the tick arrives it writes to the Cache which updates the TTL. > > When the TTL expires you will get an event on a separate thread. > > The delegate bolt would ofcourse cannot forward any of the tick tuples > > to the "problematic bolt" > > > > 2. There are four choices: > > 2.1. Try and interrupt the execute() thread. This is an anti-pattern in > > java since many libraries swallow InterruptedException and do not call > > Thread.interrupted(). So it most likely not work. > > 2.2 Have the "problematic bolt" implement RestartableBolt interface > > which has a restart method that sets everything back to normal. This is > > also not recommended as trying to fix an unknown bug state in code is > > always bad. But if there is a specific place you know the bolt gets stuck, > > then the restart method could just flip an AtomicBoolean and that code > > would check if that AtomicBoolean is set. AtomicBoolean has its cost in > > terms of CPU pipeline caching so make sure you don't put it too deep inside > > the for loops. > > 2.3 Have the "problematic bolt" run in a separate process. This would > > require implementing the multilang protocol in Java (it is already > > implemented in python/nodejs/ruby). The bolt would get hit by > > serialization/deserialization CPU cycles per emit. > > Once you implemented the multilang protocol, then run the problematic > > bolt using ShellBolt which has the PID of the child process and can kill it. > > 2.4 Throw an exception that will crash the entire worker process. The > > supervisor would then restart the process. But all of the executors on the > > workers get restarted too (and you get a few tens of seconds of downtime). > > > > Regards, > > Itai > > > > > > ________________________________________ > > From: Marc Vaillant <[email protected]> > > Sent: Thursday, October 9, 2014 8:05 PM > > To: [email protected] > > Subject: Why won't storm restart failing tasks? > > > > We have now replicated in the exclamation topology, a situation that is > > occurring for us in production. In short, one of the tasks gets into a > > hung state so that all tuples that are sent to it fail. We expected > > storm to detect this failing state (task heartbeat, etc) and then > > restart a new task/executor. But that does not happen, storm just > > continues to send tuples to to the failing task. How can we get storm > > to tear down the executor or task and spawn a new one? > > > > To replicate the issue in the exclamation topology, we modified > > TestWordSpout to emit the word "dummy" only once, and then emit from the > > standard list as usual: > > > > public void nextTuple() { > > Utils.sleep(_sleepTime); > > > > if(!_isSent) > > { > > _collector.emit(new > > Values("dummy"), new String("dummy" + _counter)); > > _isSent = true; > > } > > else > > { > > final String[] words = new String[] > > {"nathan", "mike", "jackson", "golda", "bertels"}; > > final Random rand = new Random(); > > final String word = > > words[rand.nextInt(words.length)] + _counter; > > _collector.emit(new Values(word), > > word); > > } > > _counter++; > > } > > > > In the ExclamationBolt's execute method we go into an infinite loop if > > the tuple received is "dummy": > > > > > > public void execute(Tuple tuple){ > > if(tuple.getString(0) == "dummy") > > { > > while(true) {} > > } > > _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); > > _collector.ack(tuple); > > } > > > > The topology has a parallelism of 3 for the ExclaimationBolt so the task > > that receives the "dummy" tuple fails every subsequent tuple it > > receives because it is in stuck in an infinite loop, but the other 2 > > tasks continue to process tuples normally. While storm eventually fails > > all the tuples that are sent to the "dummy" task, it never tears the > > task down to restart a new one. > > > > Any help greatly appreciated. > > > > Thanks, > > Marc
