Ok Nathan. I'm going to try with max spout pending = 200 and timeout = 120.
In case of API failure (exception thrown by the httpclient), I catch the
exception and throw a FailedException so I can reprocess the tuple.

Thanks

Maxime

On Tue, Nov 4, 2014 at 11:01 AM, Nathan Leung <[email protected]> wrote:

> I would consider reducing max spout pending since your topology end-to-end
> latency is kind of high, and keeping timeouts relatively lower.  This way
> you don't have to wait 8 minutes for error messages to determine if there
> are actually problems in the topology.  I would consider trying a max spout
> pending of 100 or 200 and seeing if it affects your throughput at all.  I'm
> guessing that it probably will not.  800/1600 tuples in flight, with max
> bolt parallelism of 120, is probably enough to keep things fed.
>
> Also, how do you handle API failure?
>
> On Tue, Nov 4, 2014 at 1:49 PM, Maxime Nay <[email protected]> wrote:
>
>> Hi,
>>
>> Thanks Vladi, Sam and Nathan for your advices.
>> The problem here is that the third party API we're using can only
>> tolerate X concurrent requests. So the parallelism hint of my API bolt is
>> limited.
>> When I get a spike in my traffic, it's okay for my topology to start
>> lagging as long as it can recover after that. That's why I set the max
>> spout pending originally: so my spouts do not flood my bolts by trying to
>> keep up with the increased traffic. I also need to handle cases when the
>> API is having issue and crashes or hangs. If the API is unavailable for a
>> few minutes, my topology should be able to recover.
>>
>> I was not aware of what Sam mentioned (a failed tuple is processed
>> anyway). It fits perfectly with what I was observing.
>> Sam, do you know how I can configure my KafkaSpouts to attach a timestamp
>> to my tuples?
>>
>> Vladi, I have 120 executors for my API bolt, and 8 spouts in total. Each
>> API take on average 2.5sc, and it can take maybe one more second for the
>> tuple to go through the rest of the topology. My calls to the API are
>> configured with a 6sc timeout.
>> I guess it means worse case scenario (API hanging), I can still process
>> 17 tuples per second. If I set the maxSpoutPending to 1k, since I have 8
>> spouts, it means I can buffer up to 8k tuples. Hence, I should set the
>> message timeout to something > 480sc.
>> Am I right?
>>
>> Thanks for your help!
>>
>>
>> Maxime
>>
>> On Tue, Nov 4, 2014 at 8:28 AM, Vladi Feigin <[email protected]> wrote:
>>
>>> <<Is there a way to let the KafkaSpout "slow down" before the overload?>>
>>> Yes. This mechanism is called : max_spout _pending  parameter . This is
>>> the way you can control the tuples volumes a spout feeds into a topology
>>>
>>> On Tue, Nov 4, 2014 at 5:27 PM, Niels Basjes <[email protected]> wrote:
>>>
>>>> Last week I noticed something similar in the solution I'm trying to
>>>> build.
>>>> I have a topology that simulates webtraffic and puts those
>>>> "measurements" in Kafka.
>>>> That has been running for a few weeks now and with the retention in
>>>> Kafka this is a few hundred million events available in Kafka.
>>>>
>>>> I then start a topology with the Kafka Spout that must read
>>>> "everything" from the start.
>>>> After a while it simply seems to stop working (I see a LOT of errors).
>>>> It is not surprising that the spout is capable of feeding those events
>>>> into the topology much faster than it can process.
>>>> It seemed to me that the topology 'died' somewhere because of this
>>>> 'overload'.
>>>>
>>>> I'm thinking "too many events in the topology"
>>>>
>>>> Simply "increase parallelism" would only delay the effect a while.
>>>>
>>>> What is the general pattern for me to actually solve this kind of
>>>> situation?
>>>> Is there a way to let the KafkaSpout "slow down" before the overload?
>>>>
>>>>
>>>> On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung <[email protected]>
>>>> wrote:
>>>>
>>>>> definitely increase parallelism if possible.  If you have 1 API bolt
>>>>> per spout, and it takes avg 1 second / api request, and your max spout
>>>>> pending is 1000, then it would take 1000 seconds to drain a full spout
>>>>> output queue, which would definitely be enough to timeout your tuples, and
>>>>> cause any subsequent tuples to timeout as well,  if you have 10 api bolts
>>>>> per spout then it still takes 100s to drain a full spout output queue.  So
>>>>> care needs to be taken.
>>>>>
>>>>> My understanding is that kafka spout will automatically replay failed
>>>>> tuples.  Is it possible that you get into a situation where you topology
>>>>> simply cannot keep up and you get into a replay loop?
>>>>>
>>>>> On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Set max pending spout to 2-3K . Try with these values.
>>>>>> To me it sounds also that you should try to increase the parallelism
>>>>>> . Home many executors do you have for Api bolts?
>>>>>> One more thing. Don't do too many changes at once. Try change by
>>>>>> change otherwise you will get lost
>>>>>> .-:)
>>>>>> Vladi
>>>>>>  On 4 Nov 2014 00:49, "Maxime Nay" <[email protected]> wrote:
>>>>>>
>>>>>>> I have a default timeout on my HttpClient (10sc for socket and 10sc
>>>>>>> for connect), and I'm not overriding this value anywhere. So I guess 
>>>>>>> none
>>>>>>> of the API calls should be blocking.
>>>>>>> I allocated 5GB of memory to each of my worker. I doubt the issue is
>>>>>>> a GC issue. But just in case I will take a look at it.
>>>>>>> What do you think would be a good value for the max pending spout? I
>>>>>>> usually use 2 executors per type of spout. So 8 executors in total for 
>>>>>>> my
>>>>>>> spouts.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> Maxime
>>>>>>>
>>>>>>> On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Yes, you probably fail because of timeouts.
>>>>>>>> Check that none of your APIs is not blocking , make sure you have a
>>>>>>>> timeout for all of them
>>>>>>>> Check your GC, if you have many full GCs you should increase your
>>>>>>>> Java heap
>>>>>>>> Seems to me that you shouldn't put too high max pending spout.
>>>>>>>> How many spouts (executors) do you have?
>>>>>>>> Vladi
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Vladi,
>>>>>>>>>
>>>>>>>>> I will put log statements in each bolt.
>>>>>>>>> The processing time per tuple is high due to a third party API
>>>>>>>>> queried through http requests in one of our bolts. It can take up to 3
>>>>>>>>> seconds to get an answer from this service.
>>>>>>>>>
>>>>>>>>> I've tried multiple values for max pending spout. 400, 800,
>>>>>>>>> 2000... It doesn't really seem to change anything. I'm also setting
>>>>>>>>> messageTimeoutSecs to 25sc.
>>>>>>>>>
>>>>>>>>> I also noticed that at some point I'm getting failed tuples, even
>>>>>>>>> though I'm never throwing any FailedException manually. So I guess 
>>>>>>>>> the only
>>>>>>>>> way for a tuple to fail is to exceed the messageTimeoutSecs?
>>>>>>>>>
>>>>>>>>> Anyway, I restarted the topology and I will take a look at the
>>>>>>>>> debug statements when it crashes again.
>>>>>>>>>
>>>>>>>>> Thanks for your help!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Maxime
>>>>>>>>>
>>>>>>>>> On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>> We have the similar problem with v. 0.82.
>>>>>>>>>> We suspect some slowest bolt in the topology hangs and this
>>>>>>>>>> causes the entire topology being hanged.
>>>>>>>>>> It can be database bolt for example.
>>>>>>>>>> Put logging in each bolt enter and exit print out the bolt
>>>>>>>>>> name,thread id and time. This will help you to find out which bolt 
>>>>>>>>>> hangs
>>>>>>>>>> Few seconds proccesing per tuple sound too long. Maybe you should
>>>>>>>>>> to profile your code as well
>>>>>>>>>> What's your max pending spout value?
>>>>>>>>>> Vladi
>>>>>>>>>>  On 31 Oct 2014 20:09, "Maxime Nay" <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> For some reason, after a few hours of processing, my topology
>>>>>>>>>>> starts hanging. In the UI's 'Topology Stats' the emitted and 
>>>>>>>>>>> transferred
>>>>>>>>>>> counts are equal to 0, and I can't see anything coming out of the 
>>>>>>>>>>> topology
>>>>>>>>>>> (usually inserting in some database).
>>>>>>>>>>>
>>>>>>>>>>> I can't see anything unusual in the storm workers logs, nor in
>>>>>>>>>>> kafka and zookeeper's logs.
>>>>>>>>>>> The zkCoordinator keeps refreshing, but nothing happens :
>>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted
>>>>>>>>>>> partition managers: []
>>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New
>>>>>>>>>>> partition managers: []
>>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished
>>>>>>>>>>> refreshing
>>>>>>>>>>> 2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read
>>>>>>>>>>> partition info from zookeeper: GlobalPartitionInformation{...
>>>>>>>>>>>
>>>>>>>>>>> I don't really understand why this is hanging, and how I could
>>>>>>>>>>> fix this.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and
>>>>>>>>>>> storm-kafka 0.9.2-incubating.
>>>>>>>>>>>
>>>>>>>>>>> My topology pulls data from 4 different topics in Kafka, and has
>>>>>>>>>>> 9 different bolts. Each bolt implements IBasicBolt. I'm not doing 
>>>>>>>>>>> any
>>>>>>>>>>> acking manually (storm should take care of this for me, right?)
>>>>>>>>>>> It takes a few second for a tuple to go through the entire
>>>>>>>>>>> topology.
>>>>>>>>>>> I'm setting a MaxSpoutPending to limit the number of tuples in
>>>>>>>>>>> the topology.
>>>>>>>>>>> My tuples shouldn't exceed the max size limit (set to default on
>>>>>>>>>>> my kafka brokers and in my SpoutConfig. And I think the default is 
>>>>>>>>>>> rather
>>>>>>>>>>> high and should easily handle a few lines of text)
>>>>>>>>>>> The tuples don't necessarily go to each bolt.
>>>>>>>>>>>
>>>>>>>>>>> I'm defining my spouts like this:
>>>>>>>>>>>         ZkHosts zkHosts = new ZkHosts("zk1.example.com:2181", "
>>>>>>>>>>> zk2.example.com:2181"...);
>>>>>>>>>>>         zkHosts.refreshFreqSecs = 120;
>>>>>>>>>>>
>>>>>>>>>>>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(),
>>>>>>>>>>>                 "TOPIC_NAME",
>>>>>>>>>>>                 "/consumers",
>>>>>>>>>>>                 "CONSUMER_ID");
>>>>>>>>>>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new
>>>>>>>>>>> StringScheme());
>>>>>>>>>>>         KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig)
>>>>>>>>>>>
>>>>>>>>>>> I'm running this topology on 2 different workers, located on two
>>>>>>>>>>> different supervisors. In total I'm using something like 160 
>>>>>>>>>>> executors.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I would greatly appreciate any help or hints on how to
>>>>>>>>>>> fix/investigate this problem!
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Maxime
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>
>

Reply via email to