When tuples are timed-out, they are still processed by Bolts in the topology 
(IMO, a terrible design flaw).  This means your API Bolt will end up spending 
time doing HTTP requests on tuples that have already timed out, meanwhile the 
"replayed" tuples will end up sitting in buffers and timing out… causing them 
to be replayed… an endless cycle.

I'd try any/all of these three:

  *   Have the Spout pass a timestamp with each tuple, and have each Bolt 
ignore/fail the tuple if the tuple is too old.  This way it will not waste time 
on timed-out tuples.  (IMHO Storm should do this for you)
  *   Set timeout to something high (5+ minutes).  You want this so high it 
only ever gets triggered on tuples that are legitimately "lost".  If you set 
this too low, tuples will timeout while still in buffers.
  *   Set max-pending to something low.  Since your topology will be backed up 
by this HTTP request Bolt, you won't really be taking a performance hit by not 
buffering tuples.

Best,
-Sam

From: Maxime Nay <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, November 3, 2014 6:48 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: KafkaSpout stops pulling data after a few hours

I have a default timeout on my HttpClient (10sc for socket and 10sc for 
connect), and I'm not overriding this value anywhere. So I guess none of the 
API calls should be blocking.
I allocated 5GB of memory to each of my worker. I doubt the issue is a GC 
issue. But just in case I will take a look at it.
What do you think would be a good value for the max pending spout? I usually 
use 2 executors per type of spout. So 8 executors in total for my spouts.

Thanks!

Maxime

On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

Yes, you probably fail because of timeouts.
Check that none of your APIs is not blocking , make sure you have a timeout for 
all of them
Check your GC, if you have many full GCs you should increase your Java heap
Seems to me that you shouldn't put too high max pending spout.
How many spouts (executors) do you have?
Vladi



On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay 
<[email protected]<mailto:[email protected]>> wrote:
Hi Vladi,

I will put log statements in each bolt.
The processing time per tuple is high due to a third party API queried through 
http requests in one of our bolts. It can take up to 3 seconds to get an answer 
from this service.

I've tried multiple values for max pending spout. 400, 800, 2000... It doesn't 
really seem to change anything. I'm also setting messageTimeoutSecs to 25sc.

I also noticed that at some point I'm getting failed tuples, even though I'm 
never throwing any FailedException manually. So I guess the only way for a 
tuple to fail is to exceed the messageTimeoutSecs?

Anyway, I restarted the topology and I will take a look at the debug statements 
when it crashes again.

Thanks for your help!


Maxime

On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin 
<[email protected]<mailto:[email protected]>> wrote:

Hi
We have the similar problem with v. 0.82.
We suspect some slowest bolt in the topology hangs and this causes the entire 
topology being hanged.
It can be database bolt for example.
Put logging in each bolt enter and exit print out the bolt name,thread id and 
time. This will help you to find out which bolt hangs
Few seconds proccesing per tuple sound too long. Maybe you should to profile 
your code as well
What's your max pending spout value?
Vladi

On 31 Oct 2014 20:09, "Maxime Nay" 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

For some reason, after a few hours of processing, my topology starts hanging. 
In the UI's 'Topology Stats' the emitted and transferred counts are equal to 0, 
and I can't see anything coming out of the topology (usually inserting in some 
database).

I can't see anything unusual in the storm workers logs, nor in kafka and 
zookeeper's logs.
The zkCoordinator keeps refreshing, but nothing happens :
2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted partition 
managers: []
2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New partition managers: 
[]
2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished refreshing
2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read partition info from 
zookeeper: GlobalPartitionInformation{...

I don't really understand why this is hanging, and how I could fix this.


I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and storm-kafka 
0.9.2-incubating.

My topology pulls data from 4 different topics in Kafka, and has 9 different 
bolts. Each bolt implements IBasicBolt. I'm not doing any acking manually 
(storm should take care of this for me, right?)
It takes a few second for a tuple to go through the entire topology.
I'm setting a MaxSpoutPending to limit the number of tuples in the topology.
My tuples shouldn't exceed the max size limit (set to default on my kafka 
brokers and in my SpoutConfig. And I think the default is rather high and 
should easily handle a few lines of text)
The tuples don't necessarily go to each bolt.

I'm defining my spouts like this:
        ZkHosts zkHosts = new 
ZkHosts("zk1.example.com:2181<http://zk1.example.com:2181>", 
"zk2.example.com:2181<http://zk2.example.com:2181>"...);
        zkHosts.refreshFreqSecs = 120;

        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(),
                "TOPIC_NAME",
                "/consumers",
                "CONSUMER_ID");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig)

I'm running this topology on 2 different workers, located on two different 
supervisors. In total I'm using something like 160 executors.


I would greatly appreciate any help or hints on how to fix/investigate this 
problem!

Thanks,
Maxime



Reply via email to