Ok Nathan. I'm going to try with max spout pending = 200 and timeout = 120. In case of API failure (exception thrown by the httpclient), I catch the exception and throw a FailedException so I can reprocess the tuple.
Thanks Maxime On Tue, Nov 4, 2014 at 11:01 AM, Nathan Leung <[email protected]> wrote: > I would consider reducing max spout pending since your topology end-to-end > latency is kind of high, and keeping timeouts relatively lower. This way > you don't have to wait 8 minutes for error messages to determine if there > are actually problems in the topology. I would consider trying a max spout > pending of 100 or 200 and seeing if it affects your throughput at all. I'm > guessing that it probably will not. 800/1600 tuples in flight, with max > bolt parallelism of 120, is probably enough to keep things fed. > > Also, how do you handle API failure? > > On Tue, Nov 4, 2014 at 1:49 PM, Maxime Nay <[email protected]> wrote: > >> Hi, >> >> Thanks Vladi, Sam and Nathan for your advices. >> The problem here is that the third party API we're using can only >> tolerate X concurrent requests. So the parallelism hint of my API bolt is >> limited. >> When I get a spike in my traffic, it's okay for my topology to start >> lagging as long as it can recover after that. That's why I set the max >> spout pending originally: so my spouts do not flood my bolts by trying to >> keep up with the increased traffic. I also need to handle cases when the >> API is having issue and crashes or hangs. If the API is unavailable for a >> few minutes, my topology should be able to recover. >> >> I was not aware of what Sam mentioned (a failed tuple is processed >> anyway). It fits perfectly with what I was observing. >> Sam, do you know how I can configure my KafkaSpouts to attach a timestamp >> to my tuples? >> >> Vladi, I have 120 executors for my API bolt, and 8 spouts in total. Each >> API take on average 2.5sc, and it can take maybe one more second for the >> tuple to go through the rest of the topology. My calls to the API are >> configured with a 6sc timeout. >> I guess it means worse case scenario (API hanging), I can still process >> 17 tuples per second. If I set the maxSpoutPending to 1k, since I have 8 >> spouts, it means I can buffer up to 8k tuples. Hence, I should set the >> message timeout to something > 480sc. >> Am I right? >> >> Thanks for your help! >> >> >> Maxime >> >> On Tue, Nov 4, 2014 at 8:28 AM, Vladi Feigin <[email protected]> wrote: >> >>> <<Is there a way to let the KafkaSpout "slow down" before the overload?>> >>> Yes. This mechanism is called : max_spout _pending parameter . This is >>> the way you can control the tuples volumes a spout feeds into a topology >>> >>> On Tue, Nov 4, 2014 at 5:27 PM, Niels Basjes <[email protected]> wrote: >>> >>>> Last week I noticed something similar in the solution I'm trying to >>>> build. >>>> I have a topology that simulates webtraffic and puts those >>>> "measurements" in Kafka. >>>> That has been running for a few weeks now and with the retention in >>>> Kafka this is a few hundred million events available in Kafka. >>>> >>>> I then start a topology with the Kafka Spout that must read >>>> "everything" from the start. >>>> After a while it simply seems to stop working (I see a LOT of errors). >>>> It is not surprising that the spout is capable of feeding those events >>>> into the topology much faster than it can process. >>>> It seemed to me that the topology 'died' somewhere because of this >>>> 'overload'. >>>> >>>> I'm thinking "too many events in the topology" >>>> >>>> Simply "increase parallelism" would only delay the effect a while. >>>> >>>> What is the general pattern for me to actually solve this kind of >>>> situation? >>>> Is there a way to let the KafkaSpout "slow down" before the overload? >>>> >>>> >>>> On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung <[email protected]> >>>> wrote: >>>> >>>>> definitely increase parallelism if possible. If you have 1 API bolt >>>>> per spout, and it takes avg 1 second / api request, and your max spout >>>>> pending is 1000, then it would take 1000 seconds to drain a full spout >>>>> output queue, which would definitely be enough to timeout your tuples, and >>>>> cause any subsequent tuples to timeout as well, if you have 10 api bolts >>>>> per spout then it still takes 100s to drain a full spout output queue. So >>>>> care needs to be taken. >>>>> >>>>> My understanding is that kafka spout will automatically replay failed >>>>> tuples. Is it possible that you get into a situation where you topology >>>>> simply cannot keep up and you get into a replay loop? >>>>> >>>>> On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin <[email protected]> >>>>> wrote: >>>>> >>>>>> Set max pending spout to 2-3K . Try with these values. >>>>>> To me it sounds also that you should try to increase the parallelism >>>>>> . Home many executors do you have for Api bolts? >>>>>> One more thing. Don't do too many changes at once. Try change by >>>>>> change otherwise you will get lost >>>>>> .-:) >>>>>> Vladi >>>>>> On 4 Nov 2014 00:49, "Maxime Nay" <[email protected]> wrote: >>>>>> >>>>>>> I have a default timeout on my HttpClient (10sc for socket and 10sc >>>>>>> for connect), and I'm not overriding this value anywhere. So I guess >>>>>>> none >>>>>>> of the API calls should be blocking. >>>>>>> I allocated 5GB of memory to each of my worker. I doubt the issue is >>>>>>> a GC issue. But just in case I will take a look at it. >>>>>>> What do you think would be a good value for the max pending spout? I >>>>>>> usually use 2 executors per type of spout. So 8 executors in total for >>>>>>> my >>>>>>> spouts. >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Maxime >>>>>>> >>>>>>> On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Yes, you probably fail because of timeouts. >>>>>>>> Check that none of your APIs is not blocking , make sure you have a >>>>>>>> timeout for all of them >>>>>>>> Check your GC, if you have many full GCs you should increase your >>>>>>>> Java heap >>>>>>>> Seems to me that you shouldn't put too high max pending spout. >>>>>>>> How many spouts (executors) do you have? >>>>>>>> Vladi >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Vladi, >>>>>>>>> >>>>>>>>> I will put log statements in each bolt. >>>>>>>>> The processing time per tuple is high due to a third party API >>>>>>>>> queried through http requests in one of our bolts. It can take up to 3 >>>>>>>>> seconds to get an answer from this service. >>>>>>>>> >>>>>>>>> I've tried multiple values for max pending spout. 400, 800, >>>>>>>>> 2000... It doesn't really seem to change anything. I'm also setting >>>>>>>>> messageTimeoutSecs to 25sc. >>>>>>>>> >>>>>>>>> I also noticed that at some point I'm getting failed tuples, even >>>>>>>>> though I'm never throwing any FailedException manually. So I guess >>>>>>>>> the only >>>>>>>>> way for a tuple to fail is to exceed the messageTimeoutSecs? >>>>>>>>> >>>>>>>>> Anyway, I restarted the topology and I will take a look at the >>>>>>>>> debug statements when it crashes again. >>>>>>>>> >>>>>>>>> Thanks for your help! >>>>>>>>> >>>>>>>>> >>>>>>>>> Maxime >>>>>>>>> >>>>>>>>> On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi >>>>>>>>>> We have the similar problem with v. 0.82. >>>>>>>>>> We suspect some slowest bolt in the topology hangs and this >>>>>>>>>> causes the entire topology being hanged. >>>>>>>>>> It can be database bolt for example. >>>>>>>>>> Put logging in each bolt enter and exit print out the bolt >>>>>>>>>> name,thread id and time. This will help you to find out which bolt >>>>>>>>>> hangs >>>>>>>>>> Few seconds proccesing per tuple sound too long. Maybe you should >>>>>>>>>> to profile your code as well >>>>>>>>>> What's your max pending spout value? >>>>>>>>>> Vladi >>>>>>>>>> On 31 Oct 2014 20:09, "Maxime Nay" <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> For some reason, after a few hours of processing, my topology >>>>>>>>>>> starts hanging. In the UI's 'Topology Stats' the emitted and >>>>>>>>>>> transferred >>>>>>>>>>> counts are equal to 0, and I can't see anything coming out of the >>>>>>>>>>> topology >>>>>>>>>>> (usually inserting in some database). >>>>>>>>>>> >>>>>>>>>>> I can't see anything unusual in the storm workers logs, nor in >>>>>>>>>>> kafka and zookeeper's logs. >>>>>>>>>>> The zkCoordinator keeps refreshing, but nothing happens : >>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted >>>>>>>>>>> partition managers: [] >>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New >>>>>>>>>>> partition managers: [] >>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished >>>>>>>>>>> refreshing >>>>>>>>>>> 2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read >>>>>>>>>>> partition info from zookeeper: GlobalPartitionInformation{... >>>>>>>>>>> >>>>>>>>>>> I don't really understand why this is hanging, and how I could >>>>>>>>>>> fix this. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and >>>>>>>>>>> storm-kafka 0.9.2-incubating. >>>>>>>>>>> >>>>>>>>>>> My topology pulls data from 4 different topics in Kafka, and has >>>>>>>>>>> 9 different bolts. Each bolt implements IBasicBolt. I'm not doing >>>>>>>>>>> any >>>>>>>>>>> acking manually (storm should take care of this for me, right?) >>>>>>>>>>> It takes a few second for a tuple to go through the entire >>>>>>>>>>> topology. >>>>>>>>>>> I'm setting a MaxSpoutPending to limit the number of tuples in >>>>>>>>>>> the topology. >>>>>>>>>>> My tuples shouldn't exceed the max size limit (set to default on >>>>>>>>>>> my kafka brokers and in my SpoutConfig. And I think the default is >>>>>>>>>>> rather >>>>>>>>>>> high and should easily handle a few lines of text) >>>>>>>>>>> The tuples don't necessarily go to each bolt. >>>>>>>>>>> >>>>>>>>>>> I'm defining my spouts like this: >>>>>>>>>>> ZkHosts zkHosts = new ZkHosts("zk1.example.com:2181", " >>>>>>>>>>> zk2.example.com:2181"...); >>>>>>>>>>> zkHosts.refreshFreqSecs = 120; >>>>>>>>>>> >>>>>>>>>>> SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(), >>>>>>>>>>> "TOPIC_NAME", >>>>>>>>>>> "/consumers", >>>>>>>>>>> "CONSUMER_ID"); >>>>>>>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new >>>>>>>>>>> StringScheme()); >>>>>>>>>>> KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig) >>>>>>>>>>> >>>>>>>>>>> I'm running this topology on 2 different workers, located on two >>>>>>>>>>> different supervisors. In total I'm using something like 160 >>>>>>>>>>> executors. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I would greatly appreciate any help or hints on how to >>>>>>>>>>> fix/investigate this problem! >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Maxime >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>> >>> >> >
