I would consider reducing max spout pending since your topology end-to-end latency is kind of high, and keeping timeouts relatively lower. This way you don't have to wait 8 minutes for error messages to determine if there are actually problems in the topology. I would consider trying a max spout pending of 100 or 200 and seeing if it affects your throughput at all. I'm guessing that it probably will not. 800/1600 tuples in flight, with max bolt parallelism of 120, is probably enough to keep things fed.
Also, how do you handle API failure? On Tue, Nov 4, 2014 at 1:49 PM, Maxime Nay <[email protected]> wrote: > Hi, > > Thanks Vladi, Sam and Nathan for your advices. > The problem here is that the third party API we're using can only tolerate > X concurrent requests. So the parallelism hint of my API bolt is limited. > When I get a spike in my traffic, it's okay for my topology to start > lagging as long as it can recover after that. That's why I set the max > spout pending originally: so my spouts do not flood my bolts by trying to > keep up with the increased traffic. I also need to handle cases when the > API is having issue and crashes or hangs. If the API is unavailable for a > few minutes, my topology should be able to recover. > > I was not aware of what Sam mentioned (a failed tuple is processed > anyway). It fits perfectly with what I was observing. > Sam, do you know how I can configure my KafkaSpouts to attach a timestamp > to my tuples? > > Vladi, I have 120 executors for my API bolt, and 8 spouts in total. Each > API take on average 2.5sc, and it can take maybe one more second for the > tuple to go through the rest of the topology. My calls to the API are > configured with a 6sc timeout. > I guess it means worse case scenario (API hanging), I can still process 17 > tuples per second. If I set the maxSpoutPending to 1k, since I have 8 > spouts, it means I can buffer up to 8k tuples. Hence, I should set the > message timeout to something > 480sc. > Am I right? > > Thanks for your help! > > > Maxime > > On Tue, Nov 4, 2014 at 8:28 AM, Vladi Feigin <[email protected]> wrote: > >> <<Is there a way to let the KafkaSpout "slow down" before the overload?>> >> Yes. This mechanism is called : max_spout _pending parameter . This is >> the way you can control the tuples volumes a spout feeds into a topology >> >> On Tue, Nov 4, 2014 at 5:27 PM, Niels Basjes <[email protected]> wrote: >> >>> Last week I noticed something similar in the solution I'm trying to >>> build. >>> I have a topology that simulates webtraffic and puts those >>> "measurements" in Kafka. >>> That has been running for a few weeks now and with the retention in >>> Kafka this is a few hundred million events available in Kafka. >>> >>> I then start a topology with the Kafka Spout that must read "everything" >>> from the start. >>> After a while it simply seems to stop working (I see a LOT of errors). >>> It is not surprising that the spout is capable of feeding those events >>> into the topology much faster than it can process. >>> It seemed to me that the topology 'died' somewhere because of this >>> 'overload'. >>> >>> I'm thinking "too many events in the topology" >>> >>> Simply "increase parallelism" would only delay the effect a while. >>> >>> What is the general pattern for me to actually solve this kind of >>> situation? >>> Is there a way to let the KafkaSpout "slow down" before the overload? >>> >>> >>> On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung <[email protected]> wrote: >>> >>>> definitely increase parallelism if possible. If you have 1 API bolt >>>> per spout, and it takes avg 1 second / api request, and your max spout >>>> pending is 1000, then it would take 1000 seconds to drain a full spout >>>> output queue, which would definitely be enough to timeout your tuples, and >>>> cause any subsequent tuples to timeout as well, if you have 10 api bolts >>>> per spout then it still takes 100s to drain a full spout output queue. So >>>> care needs to be taken. >>>> >>>> My understanding is that kafka spout will automatically replay failed >>>> tuples. Is it possible that you get into a situation where you topology >>>> simply cannot keep up and you get into a replay loop? >>>> >>>> On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin <[email protected]> >>>> wrote: >>>> >>>>> Set max pending spout to 2-3K . Try with these values. >>>>> To me it sounds also that you should try to increase the parallelism . >>>>> Home many executors do you have for Api bolts? >>>>> One more thing. Don't do too many changes at once. Try change by >>>>> change otherwise you will get lost >>>>> .-:) >>>>> Vladi >>>>> On 4 Nov 2014 00:49, "Maxime Nay" <[email protected]> wrote: >>>>> >>>>>> I have a default timeout on my HttpClient (10sc for socket and 10sc >>>>>> for connect), and I'm not overriding this value anywhere. So I guess none >>>>>> of the API calls should be blocking. >>>>>> I allocated 5GB of memory to each of my worker. I doubt the issue is >>>>>> a GC issue. But just in case I will take a look at it. >>>>>> What do you think would be a good value for the max pending spout? I >>>>>> usually use 2 executors per type of spout. So 8 executors in total for my >>>>>> spouts. >>>>>> >>>>>> Thanks! >>>>>> >>>>>> Maxime >>>>>> >>>>>> On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Yes, you probably fail because of timeouts. >>>>>>> Check that none of your APIs is not blocking , make sure you have a >>>>>>> timeout for all of them >>>>>>> Check your GC, if you have many full GCs you should increase your >>>>>>> Java heap >>>>>>> Seems to me that you shouldn't put too high max pending spout. >>>>>>> How many spouts (executors) do you have? >>>>>>> Vladi >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Vladi, >>>>>>>> >>>>>>>> I will put log statements in each bolt. >>>>>>>> The processing time per tuple is high due to a third party API >>>>>>>> queried through http requests in one of our bolts. It can take up to 3 >>>>>>>> seconds to get an answer from this service. >>>>>>>> >>>>>>>> I've tried multiple values for max pending spout. 400, 800, 2000... >>>>>>>> It doesn't really seem to change anything. I'm also setting >>>>>>>> messageTimeoutSecs to 25sc. >>>>>>>> >>>>>>>> I also noticed that at some point I'm getting failed tuples, even >>>>>>>> though I'm never throwing any FailedException manually. So I guess the >>>>>>>> only >>>>>>>> way for a tuple to fail is to exceed the messageTimeoutSecs? >>>>>>>> >>>>>>>> Anyway, I restarted the topology and I will take a look at the >>>>>>>> debug statements when it crashes again. >>>>>>>> >>>>>>>> Thanks for your help! >>>>>>>> >>>>>>>> >>>>>>>> Maxime >>>>>>>> >>>>>>>> On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi >>>>>>>>> We have the similar problem with v. 0.82. >>>>>>>>> We suspect some slowest bolt in the topology hangs and this causes >>>>>>>>> the entire topology being hanged. >>>>>>>>> It can be database bolt for example. >>>>>>>>> Put logging in each bolt enter and exit print out the bolt >>>>>>>>> name,thread id and time. This will help you to find out which bolt >>>>>>>>> hangs >>>>>>>>> Few seconds proccesing per tuple sound too long. Maybe you should >>>>>>>>> to profile your code as well >>>>>>>>> What's your max pending spout value? >>>>>>>>> Vladi >>>>>>>>> On 31 Oct 2014 20:09, "Maxime Nay" <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> For some reason, after a few hours of processing, my topology >>>>>>>>>> starts hanging. In the UI's 'Topology Stats' the emitted and >>>>>>>>>> transferred >>>>>>>>>> counts are equal to 0, and I can't see anything coming out of the >>>>>>>>>> topology >>>>>>>>>> (usually inserting in some database). >>>>>>>>>> >>>>>>>>>> I can't see anything unusual in the storm workers logs, nor in >>>>>>>>>> kafka and zookeeper's logs. >>>>>>>>>> The zkCoordinator keeps refreshing, but nothing happens : >>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted >>>>>>>>>> partition managers: [] >>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New >>>>>>>>>> partition managers: [] >>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished >>>>>>>>>> refreshing >>>>>>>>>> 2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read >>>>>>>>>> partition info from zookeeper: GlobalPartitionInformation{... >>>>>>>>>> >>>>>>>>>> I don't really understand why this is hanging, and how I could >>>>>>>>>> fix this. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and >>>>>>>>>> storm-kafka 0.9.2-incubating. >>>>>>>>>> >>>>>>>>>> My topology pulls data from 4 different topics in Kafka, and has >>>>>>>>>> 9 different bolts. Each bolt implements IBasicBolt. I'm not doing any >>>>>>>>>> acking manually (storm should take care of this for me, right?) >>>>>>>>>> It takes a few second for a tuple to go through the entire >>>>>>>>>> topology. >>>>>>>>>> I'm setting a MaxSpoutPending to limit the number of tuples in >>>>>>>>>> the topology. >>>>>>>>>> My tuples shouldn't exceed the max size limit (set to default on >>>>>>>>>> my kafka brokers and in my SpoutConfig. And I think the default is >>>>>>>>>> rather >>>>>>>>>> high and should easily handle a few lines of text) >>>>>>>>>> The tuples don't necessarily go to each bolt. >>>>>>>>>> >>>>>>>>>> I'm defining my spouts like this: >>>>>>>>>> ZkHosts zkHosts = new ZkHosts("zk1.example.com:2181", " >>>>>>>>>> zk2.example.com:2181"...); >>>>>>>>>> zkHosts.refreshFreqSecs = 120; >>>>>>>>>> >>>>>>>>>> SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(), >>>>>>>>>> "TOPIC_NAME", >>>>>>>>>> "/consumers", >>>>>>>>>> "CONSUMER_ID"); >>>>>>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new >>>>>>>>>> StringScheme()); >>>>>>>>>> KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig) >>>>>>>>>> >>>>>>>>>> I'm running this topology on 2 different workers, located on two >>>>>>>>>> different supervisors. In total I'm using something like 160 >>>>>>>>>> executors. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I would greatly appreciate any help or hints on how to >>>>>>>>>> fix/investigate this problem! >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Maxime >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >> >> >
