<<Is there a way to let the KafkaSpout "slow down" before the overload?>> Yes. This mechanism is called : max_spout _pending parameter . This is the way you can control the tuples volumes a spout feeds into a topology
On Tue, Nov 4, 2014 at 5:27 PM, Niels Basjes <[email protected]> wrote: > Last week I noticed something similar in the solution I'm trying to build. > I have a topology that simulates webtraffic and puts those "measurements" > in Kafka. > That has been running for a few weeks now and with the retention in Kafka > this is a few hundred million events available in Kafka. > > I then start a topology with the Kafka Spout that must read "everything" > from the start. > After a while it simply seems to stop working (I see a LOT of errors). > It is not surprising that the spout is capable of feeding those events > into the topology much faster than it can process. > It seemed to me that the topology 'died' somewhere because of this > 'overload'. > > I'm thinking "too many events in the topology" > > Simply "increase parallelism" would only delay the effect a while. > > What is the general pattern for me to actually solve this kind of > situation? > Is there a way to let the KafkaSpout "slow down" before the overload? > > > On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung <[email protected]> wrote: > >> definitely increase parallelism if possible. If you have 1 API bolt per >> spout, and it takes avg 1 second / api request, and your max spout pending >> is 1000, then it would take 1000 seconds to drain a full spout output >> queue, which would definitely be enough to timeout your tuples, and cause >> any subsequent tuples to timeout as well, if you have 10 api bolts per >> spout then it still takes 100s to drain a full spout output queue. So care >> needs to be taken. >> >> My understanding is that kafka spout will automatically replay failed >> tuples. Is it possible that you get into a situation where you topology >> simply cannot keep up and you get into a replay loop? >> >> On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin <[email protected]> wrote: >> >>> Set max pending spout to 2-3K . Try with these values. >>> To me it sounds also that you should try to increase the parallelism . >>> Home many executors do you have for Api bolts? >>> One more thing. Don't do too many changes at once. Try change by change >>> otherwise you will get lost >>> .-:) >>> Vladi >>> On 4 Nov 2014 00:49, "Maxime Nay" <[email protected]> wrote: >>> >>>> I have a default timeout on my HttpClient (10sc for socket and 10sc for >>>> connect), and I'm not overriding this value anywhere. So I guess none of >>>> the API calls should be blocking. >>>> I allocated 5GB of memory to each of my worker. I doubt the issue is a >>>> GC issue. But just in case I will take a look at it. >>>> What do you think would be a good value for the max pending spout? I >>>> usually use 2 executors per type of spout. So 8 executors in total for my >>>> spouts. >>>> >>>> Thanks! >>>> >>>> Maxime >>>> >>>> On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin <[email protected]> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Yes, you probably fail because of timeouts. >>>>> Check that none of your APIs is not blocking , make sure you have a >>>>> timeout for all of them >>>>> Check your GC, if you have many full GCs you should increase your Java >>>>> heap >>>>> Seems to me that you shouldn't put too high max pending spout. >>>>> How many spouts (executors) do you have? >>>>> Vladi >>>>> >>>>> >>>>> >>>>> On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Vladi, >>>>>> >>>>>> I will put log statements in each bolt. >>>>>> The processing time per tuple is high due to a third party API >>>>>> queried through http requests in one of our bolts. It can take up to 3 >>>>>> seconds to get an answer from this service. >>>>>> >>>>>> I've tried multiple values for max pending spout. 400, 800, 2000... >>>>>> It doesn't really seem to change anything. I'm also setting >>>>>> messageTimeoutSecs to 25sc. >>>>>> >>>>>> I also noticed that at some point I'm getting failed tuples, even >>>>>> though I'm never throwing any FailedException manually. So I guess the >>>>>> only >>>>>> way for a tuple to fail is to exceed the messageTimeoutSecs? >>>>>> >>>>>> Anyway, I restarted the topology and I will take a look at the debug >>>>>> statements when it crashes again. >>>>>> >>>>>> Thanks for your help! >>>>>> >>>>>> >>>>>> Maxime >>>>>> >>>>>> On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi >>>>>>> We have the similar problem with v. 0.82. >>>>>>> We suspect some slowest bolt in the topology hangs and this causes >>>>>>> the entire topology being hanged. >>>>>>> It can be database bolt for example. >>>>>>> Put logging in each bolt enter and exit print out the bolt >>>>>>> name,thread id and time. This will help you to find out which bolt hangs >>>>>>> Few seconds proccesing per tuple sound too long. Maybe you should to >>>>>>> profile your code as well >>>>>>> What's your max pending spout value? >>>>>>> Vladi >>>>>>> On 31 Oct 2014 20:09, "Maxime Nay" <[email protected]> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> For some reason, after a few hours of processing, my topology >>>>>>>> starts hanging. In the UI's 'Topology Stats' the emitted and >>>>>>>> transferred >>>>>>>> counts are equal to 0, and I can't see anything coming out of the >>>>>>>> topology >>>>>>>> (usually inserting in some database). >>>>>>>> >>>>>>>> I can't see anything unusual in the storm workers logs, nor in >>>>>>>> kafka and zookeeper's logs. >>>>>>>> The zkCoordinator keeps refreshing, but nothing happens : >>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted >>>>>>>> partition managers: [] >>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New >>>>>>>> partition managers: [] >>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished >>>>>>>> refreshing >>>>>>>> 2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read partition >>>>>>>> info from zookeeper: GlobalPartitionInformation{... >>>>>>>> >>>>>>>> I don't really understand why this is hanging, and how I could fix >>>>>>>> this. >>>>>>>> >>>>>>>> >>>>>>>> I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and storm-kafka >>>>>>>> 0.9.2-incubating. >>>>>>>> >>>>>>>> My topology pulls data from 4 different topics in Kafka, and has 9 >>>>>>>> different bolts. Each bolt implements IBasicBolt. I'm not doing any >>>>>>>> acking >>>>>>>> manually (storm should take care of this for me, right?) >>>>>>>> It takes a few second for a tuple to go through the entire topology. >>>>>>>> I'm setting a MaxSpoutPending to limit the number of tuples in the >>>>>>>> topology. >>>>>>>> My tuples shouldn't exceed the max size limit (set to default on my >>>>>>>> kafka brokers and in my SpoutConfig. And I think the default is rather >>>>>>>> high >>>>>>>> and should easily handle a few lines of text) >>>>>>>> The tuples don't necessarily go to each bolt. >>>>>>>> >>>>>>>> I'm defining my spouts like this: >>>>>>>> ZkHosts zkHosts = new ZkHosts("zk1.example.com:2181", " >>>>>>>> zk2.example.com:2181"...); >>>>>>>> zkHosts.refreshFreqSecs = 120; >>>>>>>> >>>>>>>> SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(), >>>>>>>> "TOPIC_NAME", >>>>>>>> "/consumers", >>>>>>>> "CONSUMER_ID"); >>>>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new >>>>>>>> StringScheme()); >>>>>>>> KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig) >>>>>>>> >>>>>>>> I'm running this topology on 2 different workers, located on two >>>>>>>> different supervisors. In total I'm using something like 160 executors. >>>>>>>> >>>>>>>> >>>>>>>> I would greatly appreciate any help or hints on how to >>>>>>>> fix/investigate this problem! >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Maxime >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >
