I would consider reducing max spout pending since your topology end-to-end
latency is kind of high, and keeping timeouts relatively lower.  This way
you don't have to wait 8 minutes for error messages to determine if there
are actually problems in the topology.  I would consider trying a max spout
pending of 100 or 200 and seeing if it affects your throughput at all.  I'm
guessing that it probably will not.  800/1600 tuples in flight, with max
bolt parallelism of 120, is probably enough to keep things fed.

Also, how do you handle API failure?

On Tue, Nov 4, 2014 at 1:49 PM, Maxime Nay <[email protected]> wrote:

> Hi,
>
> Thanks Vladi, Sam and Nathan for your advices.
> The problem here is that the third party API we're using can only tolerate
> X concurrent requests. So the parallelism hint of my API bolt is limited.
> When I get a spike in my traffic, it's okay for my topology to start
> lagging as long as it can recover after that. That's why I set the max
> spout pending originally: so my spouts do not flood my bolts by trying to
> keep up with the increased traffic. I also need to handle cases when the
> API is having issue and crashes or hangs. If the API is unavailable for a
> few minutes, my topology should be able to recover.
>
> I was not aware of what Sam mentioned (a failed tuple is processed
> anyway). It fits perfectly with what I was observing.
> Sam, do you know how I can configure my KafkaSpouts to attach a timestamp
> to my tuples?
>
> Vladi, I have 120 executors for my API bolt, and 8 spouts in total. Each
> API take on average 2.5sc, and it can take maybe one more second for the
> tuple to go through the rest of the topology. My calls to the API are
> configured with a 6sc timeout.
> I guess it means worse case scenario (API hanging), I can still process 17
> tuples per second. If I set the maxSpoutPending to 1k, since I have 8
> spouts, it means I can buffer up to 8k tuples. Hence, I should set the
> message timeout to something > 480sc.
> Am I right?
>
> Thanks for your help!
>
>
> Maxime
>
> On Tue, Nov 4, 2014 at 8:28 AM, Vladi Feigin <[email protected]> wrote:
>
>> <<Is there a way to let the KafkaSpout "slow down" before the overload?>>
>> Yes. This mechanism is called : max_spout _pending  parameter . This is
>> the way you can control the tuples volumes a spout feeds into a topology
>>
>> On Tue, Nov 4, 2014 at 5:27 PM, Niels Basjes <[email protected]> wrote:
>>
>>> Last week I noticed something similar in the solution I'm trying to
>>> build.
>>> I have a topology that simulates webtraffic and puts those
>>> "measurements" in Kafka.
>>> That has been running for a few weeks now and with the retention in
>>> Kafka this is a few hundred million events available in Kafka.
>>>
>>> I then start a topology with the Kafka Spout that must read "everything"
>>> from the start.
>>> After a while it simply seems to stop working (I see a LOT of errors).
>>> It is not surprising that the spout is capable of feeding those events
>>> into the topology much faster than it can process.
>>> It seemed to me that the topology 'died' somewhere because of this
>>> 'overload'.
>>>
>>> I'm thinking "too many events in the topology"
>>>
>>> Simply "increase parallelism" would only delay the effect a while.
>>>
>>> What is the general pattern for me to actually solve this kind of
>>> situation?
>>> Is there a way to let the KafkaSpout "slow down" before the overload?
>>>
>>>
>>> On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung <[email protected]> wrote:
>>>
>>>> definitely increase parallelism if possible.  If you have 1 API bolt
>>>> per spout, and it takes avg 1 second / api request, and your max spout
>>>> pending is 1000, then it would take 1000 seconds to drain a full spout
>>>> output queue, which would definitely be enough to timeout your tuples, and
>>>> cause any subsequent tuples to timeout as well,  if you have 10 api bolts
>>>> per spout then it still takes 100s to drain a full spout output queue.  So
>>>> care needs to be taken.
>>>>
>>>> My understanding is that kafka spout will automatically replay failed
>>>> tuples.  Is it possible that you get into a situation where you topology
>>>> simply cannot keep up and you get into a replay loop?
>>>>
>>>> On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin <[email protected]>
>>>> wrote:
>>>>
>>>>> Set max pending spout to 2-3K . Try with these values.
>>>>> To me it sounds also that you should try to increase the parallelism .
>>>>> Home many executors do you have for Api bolts?
>>>>> One more thing. Don't do too many changes at once. Try change by
>>>>> change otherwise you will get lost
>>>>> .-:)
>>>>> Vladi
>>>>>  On 4 Nov 2014 00:49, "Maxime Nay" <[email protected]> wrote:
>>>>>
>>>>>> I have a default timeout on my HttpClient (10sc for socket and 10sc
>>>>>> for connect), and I'm not overriding this value anywhere. So I guess none
>>>>>> of the API calls should be blocking.
>>>>>> I allocated 5GB of memory to each of my worker. I doubt the issue is
>>>>>> a GC issue. But just in case I will take a look at it.
>>>>>> What do you think would be a good value for the max pending spout? I
>>>>>> usually use 2 executors per type of spout. So 8 executors in total for my
>>>>>> spouts.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Maxime
>>>>>>
>>>>>> On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Yes, you probably fail because of timeouts.
>>>>>>> Check that none of your APIs is not blocking , make sure you have a
>>>>>>> timeout for all of them
>>>>>>> Check your GC, if you have many full GCs you should increase your
>>>>>>> Java heap
>>>>>>> Seems to me that you shouldn't put too high max pending spout.
>>>>>>> How many spouts (executors) do you have?
>>>>>>> Vladi
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vladi,
>>>>>>>>
>>>>>>>> I will put log statements in each bolt.
>>>>>>>> The processing time per tuple is high due to a third party API
>>>>>>>> queried through http requests in one of our bolts. It can take up to 3
>>>>>>>> seconds to get an answer from this service.
>>>>>>>>
>>>>>>>> I've tried multiple values for max pending spout. 400, 800, 2000...
>>>>>>>> It doesn't really seem to change anything. I'm also setting
>>>>>>>> messageTimeoutSecs to 25sc.
>>>>>>>>
>>>>>>>> I also noticed that at some point I'm getting failed tuples, even
>>>>>>>> though I'm never throwing any FailedException manually. So I guess the 
>>>>>>>> only
>>>>>>>> way for a tuple to fail is to exceed the messageTimeoutSecs?
>>>>>>>>
>>>>>>>> Anyway, I restarted the topology and I will take a look at the
>>>>>>>> debug statements when it crashes again.
>>>>>>>>
>>>>>>>> Thanks for your help!
>>>>>>>>
>>>>>>>>
>>>>>>>> Maxime
>>>>>>>>
>>>>>>>> On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>> We have the similar problem with v. 0.82.
>>>>>>>>> We suspect some slowest bolt in the topology hangs and this causes
>>>>>>>>> the entire topology being hanged.
>>>>>>>>> It can be database bolt for example.
>>>>>>>>> Put logging in each bolt enter and exit print out the bolt
>>>>>>>>> name,thread id and time. This will help you to find out which bolt 
>>>>>>>>> hangs
>>>>>>>>> Few seconds proccesing per tuple sound too long. Maybe you should
>>>>>>>>> to profile your code as well
>>>>>>>>> What's your max pending spout value?
>>>>>>>>> Vladi
>>>>>>>>>  On 31 Oct 2014 20:09, "Maxime Nay" <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> For some reason, after a few hours of processing, my topology
>>>>>>>>>> starts hanging. In the UI's 'Topology Stats' the emitted and 
>>>>>>>>>> transferred
>>>>>>>>>> counts are equal to 0, and I can't see anything coming out of the 
>>>>>>>>>> topology
>>>>>>>>>> (usually inserting in some database).
>>>>>>>>>>
>>>>>>>>>> I can't see anything unusual in the storm workers logs, nor in
>>>>>>>>>> kafka and zookeeper's logs.
>>>>>>>>>> The zkCoordinator keeps refreshing, but nothing happens :
>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted
>>>>>>>>>> partition managers: []
>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New
>>>>>>>>>> partition managers: []
>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished
>>>>>>>>>> refreshing
>>>>>>>>>> 2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read
>>>>>>>>>> partition info from zookeeper: GlobalPartitionInformation{...
>>>>>>>>>>
>>>>>>>>>> I don't really understand why this is hanging, and how I could
>>>>>>>>>> fix this.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and
>>>>>>>>>> storm-kafka 0.9.2-incubating.
>>>>>>>>>>
>>>>>>>>>> My topology pulls data from 4 different topics in Kafka, and has
>>>>>>>>>> 9 different bolts. Each bolt implements IBasicBolt. I'm not doing any
>>>>>>>>>> acking manually (storm should take care of this for me, right?)
>>>>>>>>>> It takes a few second for a tuple to go through the entire
>>>>>>>>>> topology.
>>>>>>>>>> I'm setting a MaxSpoutPending to limit the number of tuples in
>>>>>>>>>> the topology.
>>>>>>>>>> My tuples shouldn't exceed the max size limit (set to default on
>>>>>>>>>> my kafka brokers and in my SpoutConfig. And I think the default is 
>>>>>>>>>> rather
>>>>>>>>>> high and should easily handle a few lines of text)
>>>>>>>>>> The tuples don't necessarily go to each bolt.
>>>>>>>>>>
>>>>>>>>>> I'm defining my spouts like this:
>>>>>>>>>>         ZkHosts zkHosts = new ZkHosts("zk1.example.com:2181", "
>>>>>>>>>> zk2.example.com:2181"...);
>>>>>>>>>>         zkHosts.refreshFreqSecs = 120;
>>>>>>>>>>
>>>>>>>>>>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(),
>>>>>>>>>>                 "TOPIC_NAME",
>>>>>>>>>>                 "/consumers",
>>>>>>>>>>                 "CONSUMER_ID");
>>>>>>>>>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new
>>>>>>>>>> StringScheme());
>>>>>>>>>>         KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig)
>>>>>>>>>>
>>>>>>>>>> I'm running this topology on 2 different workers, located on two
>>>>>>>>>> different supervisors. In total I'm using something like 160 
>>>>>>>>>> executors.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I would greatly appreciate any help or hints on how to
>>>>>>>>>> fix/investigate this problem!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Maxime
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>

Reply via email to