Hi Itai,

Thanks very much for your response and suggestions.  My first naive
thought is: why is this so hard? Isn't this the most basic fault
tolerance provided by the simplest load balancers (which is essentially
what a shuffle grouping is)?  Second question is that I see config
parameters like task.heartbeat.frequency.secs, what is the purpose of a
task heartbeat if not to detect this kind of situation?

Thanks,
Marc

On Fri, Oct 10, 2014 at 09:22:38AM +0000, Itai Frenkel wrote:
> Hi Marc,
> 
> (I'm a storm newbie myself).
> 
> As I understand it your question has two parts:
> 1. Can Storm detect a Bolt that is "stuck". AFAIK the answer is no.
> 2. Can Storm do anything about it. AFAIK the answer is no.
> 
> What I would do in your situation:
> 1. I would write a delegate bolt that would delegate all the execute/ack/fail 
> to "problematic bolt". 
>     I would then connect to that delegate bolt a tick tuple 
> http://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/  which would 
> update a TTL data structure
>    (such as expireAfterWrite and removeListener as described in 
> http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html
>   )
>    Each time the tick arrives it writes to the Cache which updates the TTL. 
> When the TTL expires you will get an event on a separate thread.
>    The delegate bolt would ofcourse cannot forward any of the tick tuples to 
> the "problematic bolt"
> 
> 2. There are four choices:
>     2.1. Try and interrupt the execute() thread. This is an anti-pattern in 
> java since many libraries swallow InterruptedException and do not call 
> Thread.interrupted(). So it most likely not work. 
>     2.2  Have the "problematic bolt" implement RestartableBolt interface 
> which has a restart method that sets everything back to normal. This is also 
> not recommended as trying to fix an unknown bug state in code is always bad. 
> But if there is a specific place you know the bolt gets stuck, then the 
> restart method could just flip an AtomicBoolean and that code would check if 
> that AtomicBoolean is set. AtomicBoolean has its cost in terms of CPU 
> pipeline caching so make sure you don't put it too deep inside the for loops. 
>    2.3 Have the "problematic bolt" run in a separate process. This would 
> require implementing the multilang protocol in Java (it is already 
> implemented in python/nodejs/ruby). The bolt would get hit by 
> serialization/deserialization CPU cycles per emit. 
>     Once you implemented the multilang protocol, then run the problematic 
> bolt using ShellBolt which has the PID of the child process and can kill it.
>    2.4 Throw an exception that will crash the entire worker process. The 
> supervisor would then restart the process. But all of the executors on the 
> workers get restarted too (and you get a few tens of seconds of downtime).
> 
> Regards,
> Itai
> 
> 
> ________________________________________
> From: Marc Vaillant <[email protected]>
> Sent: Thursday, October 9, 2014 8:05 PM
> To: [email protected]
> Subject: Why won't storm restart failing tasks?
> 
> We have now replicated in the exclamation topology, a situation that is
> occurring for us in production.  In short, one of the tasks gets into a
> hung state so that all tuples that are sent to it fail.  We expected
> storm to detect this failing state (task heartbeat, etc) and then
> restart a new task/executor.  But that does not happen, storm just
> continues to send tuples to to the failing task.  How can we get storm
> to tear down the executor or task and spawn a new one?
> 
> To replicate the issue in the exclamation topology, we modified
> TestWordSpout to emit the word "dummy" only once, and then emit from the
> standard list as usual:
> 
>     public void nextTuple() {
>         Utils.sleep(_sleepTime);
> 
>                                 if(!_isSent)
>                                 {
>                                         _collector.emit(new Values("dummy"), 
> new String("dummy" + _counter));
>                                         _isSent = true;
>                                 }
>                                 else
>                                 {
>                                         final String[] words = new String[] 
> {"nathan", "mike", "jackson", "golda", "bertels"};
>                                         final Random rand = new Random();
>                                         final String word = 
> words[rand.nextInt(words.length)] + _counter;
>                                         _collector.emit(new Values(word), 
> word);
>                                 }
>                                 _counter++;
>                 }
> 
> In the ExclamationBolt's execute method we go into an infinite loop if
> the tuple received is "dummy":
> 
> 
>     public void execute(Tuple tuple){
>                         if(tuple.getString(0) == "dummy")
>                         {
>                                 while(true) {}
>                         }
>       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>       _collector.ack(tuple);
>     }
> 
> The topology has a parallelism of 3 for the ExclaimationBolt so the task
> that receives the "dummy" tuple fails every subsequent tuple it
> receives because it is in stuck in an  infinite loop, but the other 2
> tasks continue to process tuples normally.  While storm eventually fails
> all the tuples that are sent to the "dummy" task, it never tears the
> task down to restart a new one.
> 
> Any help greatly appreciated.
> 
> Thanks,
> Marc

Reply via email to