Maxime,

I'm glad I could help.  This issue with Storm not ignoring timed-out tuples 
caught me by surprise as well, and it's not well documented, nor is it expected.

Adding a timestamp should be easy.  I've not used KafkaSpout before so I don't 
if it supports it, but you can always create your own Bolt (immediately after 
the Spout) that simply tacks on a field called "timestamp-ms" -- use (new 
Date()).getTime().  Then, in each bolt, you do something like this:

        Date expire_date = new Date(input.getLongByField("timestamp-ms") + 
this.tuple_timeout_s*1000);
        if ((new Date()).after(expire_date)){
            logger.warn("Skipping tuple because it is too old: {}", 
input.getValues());
            return;
        }

The problem here is ensuring all of your workers have clocks synchronized with 
the Spout, otherwise you'll run into trouble.

Regarding choosing a max_pending value:

Your topology's bottleneck is definitely your Bolt that does the API call... so 
max-pending needn't be set so high.. just enough to have a buffer in each 
executor large enough to last the time it takes for more tuples to be emitted 
from the spout to the worker.  Consider what happens with a low value:  with 
max_pending equal to the number of executors, your executors will execute and 
then wait for the time it takes the Spout to grab a tuple and emit it to the 
executor (probably on the order of a few ms).  A few ms of inefficiency isn't 
that bad!  Now, if you set max_pending to 2x that, you should be just fine… 
your Bolts will always have something in their buffer to execute, so they'll 
never be waiting on the Spout.

The above math may change with different numbers of spouts and configurations, 
but you get the idea.

Best,
-Sam

From: Maxime Nay <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, November 4, 2014 1:49 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: KafkaSpout stops pulling data after a few hours

Hi,

Thanks Vladi, Sam and Nathan for your advices.
The problem here is that the third party API we're using can only tolerate X 
concurrent requests. So the parallelism hint of my API bolt is limited.
When I get a spike in my traffic, it's okay for my topology to start lagging as 
long as it can recover after that. That's why I set the max spout pending 
originally: so my spouts do not flood my bolts by trying to keep up with the 
increased traffic. I also need to handle cases when the API is having issue and 
crashes or hangs. If the API is unavailable for a few minutes, my topology 
should be able to recover.

I was not aware of what Sam mentioned (a failed tuple is processed anyway). It 
fits perfectly with what I was observing.
Sam, do you know how I can configure my KafkaSpouts to attach a timestamp to my 
tuples?

Vladi, I have 120 executors for my API bolt, and 8 spouts in total. Each API 
take on average 2.5sc, and it can take maybe one more second for the tuple to 
go through the rest of the topology. My calls to the API are configured with a 
6sc timeout.
I guess it means worse case scenario (API hanging), I can still process 17 
tuples per second. If I set the maxSpoutPending to 1k, since I have 8 spouts, 
it means I can buffer up to 8k tuples. Hence, I should set the message timeout 
to something > 480sc.
Am I right?

Thanks for your help!


Maxime

On Tue, Nov 4, 2014 at 8:28 AM, Vladi Feigin 
<[email protected]<mailto:[email protected]>> wrote:
<<Is there a way to let the KafkaSpout "slow down" before the overload?>>
Yes. This mechanism is called : max_spout _pending  parameter . This is the way 
you can control the tuples volumes a spout feeds into a topology

On Tue, Nov 4, 2014 at 5:27 PM, Niels Basjes 
<[email protected]<mailto:[email protected]>> wrote:
Last week I noticed something similar in the solution I'm trying to build.
I have a topology that simulates webtraffic and puts those "measurements" in 
Kafka.
That has been running for a few weeks now and with the retention in Kafka this 
is a few hundred million events available in Kafka.

I then start a topology with the Kafka Spout that must read "everything" from 
the start.
After a while it simply seems to stop working (I see a LOT of errors).
It is not surprising that the spout is capable of feeding those events into the 
topology much faster than it can process.
It seemed to me that the topology 'died' somewhere because of this 'overload'.

I'm thinking "too many events in the topology"

Simply "increase parallelism" would only delay the effect a while.

What is the general pattern for me to actually solve this kind of situation?
Is there a way to let the KafkaSpout "slow down" before the overload?


On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung 
<[email protected]<mailto:[email protected]>> wrote:
definitely increase parallelism if possible.  If you have 1 API bolt per spout, 
and it takes avg 1 second / api request, and your max spout pending is 1000, 
then it would take 1000 seconds to drain a full spout output queue, which would 
definitely be enough to timeout your tuples, and cause any subsequent tuples to 
timeout as well,  if you have 10 api bolts per spout then it still takes 100s 
to drain a full spout output queue.  So care needs to be taken.

My understanding is that kafka spout will automatically replay failed tuples.  
Is it possible that you get into a situation where you topology simply cannot 
keep up and you get into a replay loop?

On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin 
<[email protected]<mailto:[email protected]>> wrote:

Set max pending spout to 2-3K . Try with these values.
To me it sounds also that you should try to increase the parallelism . Home 
many executors do you have for Api bolts?
One more thing. Don't do too many changes at once. Try change by change 
otherwise you will get lost
.-:)
Vladi

On 4 Nov 2014 00:49, "Maxime Nay" 
<[email protected]<mailto:[email protected]>> wrote:
I have a default timeout on my HttpClient (10sc for socket and 10sc for 
connect), and I'm not overriding this value anywhere. So I guess none of the 
API calls should be blocking.
I allocated 5GB of memory to each of my worker. I doubt the issue is a GC 
issue. But just in case I will take a look at it.
What do you think would be a good value for the max pending spout? I usually 
use 2 executors per type of spout. So 8 executors in total for my spouts.

Thanks!

Maxime

On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

Yes, you probably fail because of timeouts.
Check that none of your APIs is not blocking , make sure you have a timeout for 
all of them
Check your GC, if you have many full GCs you should increase your Java heap
Seems to me that you shouldn't put too high max pending spout.
How many spouts (executors) do you have?
Vladi



On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay 
<[email protected]<mailto:[email protected]>> wrote:
Hi Vladi,

I will put log statements in each bolt.
The processing time per tuple is high due to a third party API queried through 
http requests in one of our bolts. It can take up to 3 seconds to get an answer 
from this service.

I've tried multiple values for max pending spout. 400, 800, 2000... It doesn't 
really seem to change anything. I'm also setting messageTimeoutSecs to 25sc.

I also noticed that at some point I'm getting failed tuples, even though I'm 
never throwing any FailedException manually. So I guess the only way for a 
tuple to fail is to exceed the messageTimeoutSecs?

Anyway, I restarted the topology and I will take a look at the debug statements 
when it crashes again.

Thanks for your help!


Maxime

On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin 
<[email protected]<mailto:[email protected]>> wrote:

Hi
We have the similar problem with v. 0.82.
We suspect some slowest bolt in the topology hangs and this causes the entire 
topology being hanged.
It can be database bolt for example.
Put logging in each bolt enter and exit print out the bolt name,thread id and 
time. This will help you to find out which bolt hangs
Few seconds proccesing per tuple sound too long. Maybe you should to profile 
your code as well
What's your max pending spout value?
Vladi

On 31 Oct 2014 20:09, "Maxime Nay" 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

For some reason, after a few hours of processing, my topology starts hanging. 
In the UI's 'Topology Stats' the emitted and transferred counts are equal to 0, 
and I can't see anything coming out of the topology (usually inserting in some 
database).

I can't see anything unusual in the storm workers logs, nor in kafka and 
zookeeper's logs.
The zkCoordinator keeps refreshing, but nothing happens :
2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted partition 
managers: []
2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New partition managers: 
[]
2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished refreshing
2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read partition info from 
zookeeper: GlobalPartitionInformation{...

I don't really understand why this is hanging, and how I could fix this.


I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and storm-kafka 
0.9.2-incubating.

My topology pulls data from 4 different topics in Kafka, and has 9 different 
bolts. Each bolt implements IBasicBolt. I'm not doing any acking manually 
(storm should take care of this for me, right?)
It takes a few second for a tuple to go through the entire topology.
I'm setting a MaxSpoutPending to limit the number of tuples in the topology.
My tuples shouldn't exceed the max size limit (set to default on my kafka 
brokers and in my SpoutConfig. And I think the default is rather high and 
should easily handle a few lines of text)
The tuples don't necessarily go to each bolt.

I'm defining my spouts like this:
        ZkHosts zkHosts = new 
ZkHosts("zk1.example.com:2181<http://zk1.example.com:2181>", 
"zk2.example.com:2181<http://zk2.example.com:2181>"...);
        zkHosts.refreshFreqSecs = 120;

        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(),
                "TOPIC_NAME",
                "/consumers",
                "CONSUMER_ID");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig)

I'm running this topology on 2 different workers, located on two different 
supervisors. In total I'm using something like 160 executors.


I would greatly appreciate any help or hints on how to fix/investigate this 
problem!

Thanks,
Maxime







--
Best regards / Met vriendelijke groeten,

Niels Basjes


Reply via email to