How about:

https://storm.incubator.apache.org/apidocs/backtype/storm/Config.html#NIMBUS_TASK_TIMEOUT_SECS

"How long without heartbeating a task can go before nimbus will consider
the task dead and reassign it to another location."

and

https://storm.incubator.apache.org/apidocs/backtype/storm/Config.html#NIMBUS_REASSIGN

"Whether or not nimbus should reassign tasks if it detects that a task
goes down. Defaults to true, and it's not recommended to change this
value."

Marc

On Sat, Oct 11, 2014 at 08:42:16PM +0000, Itai Frenkel wrote:
> AFAIK the task.heartbeat.frequency.secs is related to zookeeper bookeeping. 
> Not sure if it is being act upon.
> 
> See also https://github.com/apache/storm/pull/286 (related to 2.3 below).
> 
> Itai
> 
> ________________________________________
> From: Marc Vaillant <[email protected]>
> Sent: Friday, October 10, 2014 7:32 PM
> To: [email protected]
> Subject: Re: Why won't storm restart failing tasks?
> 
> Hi Itai,
> 
> Thanks very much for your response and suggestions.  My first naive
> thought is: why is this so hard? Isn't this the most basic fault
> tolerance provided by the simplest load balancers (which is essentially
> what a shuffle grouping is)?  Second question is that I see config
> parameters like task.heartbeat.frequency.secs, what is the purpose of a
> task heartbeat if not to detect this kind of situation?
> 
> Thanks,
> Marc
> 
> On Fri, Oct 10, 2014 at 09:22:38AM +0000, Itai Frenkel wrote:
> > Hi Marc,
> >
> > (I'm a storm newbie myself).
> >
> > As I understand it your question has two parts:
> > 1. Can Storm detect a Bolt that is "stuck". AFAIK the answer is no.
> > 2. Can Storm do anything about it. AFAIK the answer is no.
> >
> > What I would do in your situation:
> > 1. I would write a delegate bolt that would delegate all the 
> > execute/ack/fail to "problematic bolt".
> >     I would then connect to that delegate bolt a tick tuple 
> > http://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/  which would 
> > update a TTL data structure
> >    (such as expireAfterWrite and removeListener as described in 
> > http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html
> >   )
> >    Each time the tick arrives it writes to the Cache which updates the TTL. 
> > When the TTL expires you will get an event on a separate thread.
> >    The delegate bolt would ofcourse cannot forward any of the tick tuples 
> > to the "problematic bolt"
> >
> > 2. There are four choices:
> >     2.1. Try and interrupt the execute() thread. This is an anti-pattern in 
> > java since many libraries swallow InterruptedException and do not call 
> > Thread.interrupted(). So it most likely not work.
> >     2.2  Have the "problematic bolt" implement RestartableBolt interface 
> > which has a restart method that sets everything back to normal. This is 
> > also not recommended as trying to fix an unknown bug state in code is 
> > always bad. But if there is a specific place you know the bolt gets stuck, 
> > then the restart method could just flip an AtomicBoolean and that code 
> > would check if that AtomicBoolean is set. AtomicBoolean has its cost in 
> > terms of CPU pipeline caching so make sure you don't put it too deep inside 
> > the for loops.
> >    2.3 Have the "problematic bolt" run in a separate process. This would 
> > require implementing the multilang protocol in Java (it is already 
> > implemented in python/nodejs/ruby). The bolt would get hit by 
> > serialization/deserialization CPU cycles per emit.
> >     Once you implemented the multilang protocol, then run the problematic 
> > bolt using ShellBolt which has the PID of the child process and can kill it.
> >    2.4 Throw an exception that will crash the entire worker process. The 
> > supervisor would then restart the process. But all of the executors on the 
> > workers get restarted too (and you get a few tens of seconds of downtime).
> >
> > Regards,
> > Itai
> >
> >
> > ________________________________________
> > From: Marc Vaillant <[email protected]>
> > Sent: Thursday, October 9, 2014 8:05 PM
> > To: [email protected]
> > Subject: Why won't storm restart failing tasks?
> >
> > We have now replicated in the exclamation topology, a situation that is
> > occurring for us in production.  In short, one of the tasks gets into a
> > hung state so that all tuples that are sent to it fail.  We expected
> > storm to detect this failing state (task heartbeat, etc) and then
> > restart a new task/executor.  But that does not happen, storm just
> > continues to send tuples to to the failing task.  How can we get storm
> > to tear down the executor or task and spawn a new one?
> >
> > To replicate the issue in the exclamation topology, we modified
> > TestWordSpout to emit the word "dummy" only once, and then emit from the
> > standard list as usual:
> >
> >     public void nextTuple() {
> >         Utils.sleep(_sleepTime);
> >
> >                                 if(!_isSent)
> >                                 {
> >                                         _collector.emit(new 
> > Values("dummy"), new String("dummy" + _counter));
> >                                         _isSent = true;
> >                                 }
> >                                 else
> >                                 {
> >                                         final String[] words = new String[] 
> > {"nathan", "mike", "jackson", "golda", "bertels"};
> >                                         final Random rand = new Random();
> >                                         final String word = 
> > words[rand.nextInt(words.length)] + _counter;
> >                                         _collector.emit(new Values(word), 
> > word);
> >                                 }
> >                                 _counter++;
> >                 }
> >
> > In the ExclamationBolt's execute method we go into an infinite loop if
> > the tuple received is "dummy":
> >
> >
> >     public void execute(Tuple tuple){
> >                         if(tuple.getString(0) == "dummy")
> >                         {
> >                                 while(true) {}
> >                         }
> >       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
> >       _collector.ack(tuple);
> >     }
> >
> > The topology has a parallelism of 3 for the ExclaimationBolt so the task
> > that receives the "dummy" tuple fails every subsequent tuple it
> > receives because it is in stuck in an  infinite loop, but the other 2
> > tasks continue to process tuples normally.  While storm eventually fails
> > all the tuples that are sent to the "dummy" task, it never tears the
> > task down to restart a new one.
> >
> > Any help greatly appreciated.
> >
> > Thanks,
> > Marc

Reply via email to