I searched briefly and couldn't find how to attach/get the timestamp from a tuple emitted by a KafkaSpout. As suggested by Sam I will add the timestamp in a bolt right after the spout and see if it's enough to have something stable. I will keep you posted on how it goes.
Thanks! Maxime On Tue, Nov 4, 2014 at 2:30 PM, Nathan Leung <[email protected]> wrote: > If possible it is preferable to timestamp in the spout. If you emit when > the spout output queue is nearly full you will have to wait for it to drain > to the tuple you emitted before that tuple will be sent to the timestamp > bolt which could substantially skew the difference between the timestamp > and the actual emit time. > On Nov 4, 2014 5:17 PM, "Sam Mati" <[email protected]> wrote: > >> Maxime, >> >> I'm glad I could help. This issue with Storm not ignoring timed-out >> tuples caught me by surprise as well, and it's not well documented, nor is >> it expected. >> >> Adding a timestamp should be easy. I've not used KafkaSpout before so >> I don't if it supports it, but you can always create your own Bolt >> (immediately after the Spout) that simply tacks on a field called >> "timestamp-ms" -- use (new Date()).getTime(). Then, in each bolt, you do >> something like this: >> >> Date expire_date = new >> Date(input.getLongByField("timestamp-ms") + this.tuple_timeout_s*1000); >> if ((new Date()).after(expire_date)){ >> logger.warn("Skipping tuple because it is too old: {}", >> input.getValues()); >> return; >> } >> >> The problem here is ensuring all of your workers have clocks >> synchronized with the Spout, otherwise you'll run into trouble. >> >> Regarding choosing a max_pending value: >> >> Your topology's bottleneck is definitely your Bolt that does the API >> call... so max-pending needn't be set so high.. just enough to have a >> buffer in each executor large enough to last the time it takes for more >> tuples to be emitted from the spout to the worker. Consider what happens >> with a low value: with max_pending equal to the number of executors, your >> executors will execute and then wait for the time it takes the Spout to >> grab a tuple and emit it to the executor (probably on the order of a few >> ms). A few ms of inefficiency isn't that bad! Now, if you set max_pending >> to 2x that, you should be just fine⦠your Bolts will always have something >> in their buffer to execute, so they'll never be waiting on the Spout. >> >> The above math may change with different numbers of spouts and >> configurations, but you get the idea. >> >> Best, >> -Sam >> >> From: Maxime Nay <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Tuesday, November 4, 2014 1:49 PM >> To: "[email protected]" <[email protected]> >> Subject: Re: KafkaSpout stops pulling data after a few hours >> >> Hi, >> >> Thanks Vladi, Sam and Nathan for your advices. >> The problem here is that the third party API we're using can only >> tolerate X concurrent requests. So the parallelism hint of my API bolt is >> limited. >> When I get a spike in my traffic, it's okay for my topology to start >> lagging as long as it can recover after that. That's why I set the max >> spout pending originally: so my spouts do not flood my bolts by trying to >> keep up with the increased traffic. I also need to handle cases when the >> API is having issue and crashes or hangs. If the API is unavailable for a >> few minutes, my topology should be able to recover. >> >> I was not aware of what Sam mentioned (a failed tuple is processed >> anyway). It fits perfectly with what I was observing. >> Sam, do you know how I can configure my KafkaSpouts to attach a >> timestamp to my tuples? >> >> Vladi, I have 120 executors for my API bolt, and 8 spouts in total. Each >> API take on average 2.5sc, and it can take maybe one more second for the >> tuple to go through the rest of the topology. My calls to the API are >> configured with a 6sc timeout. >> I guess it means worse case scenario (API hanging), I can still process >> 17 tuples per second. If I set the maxSpoutPending to 1k, since I have 8 >> spouts, it means I can buffer up to 8k tuples. Hence, I should set the >> message timeout to something > 480sc. >> Am I right? >> >> Thanks for your help! >> >> >> Maxime >> >> On Tue, Nov 4, 2014 at 8:28 AM, Vladi Feigin <[email protected]> wrote: >> >>> <<Is there a way to let the KafkaSpout "slow down" before the overload?>> >>> Yes. This mechanism is called : max_spout _pending parameter . This >>> is the way you can control the tuples volumes a spout feeds into a topology >>> >>> On Tue, Nov 4, 2014 at 5:27 PM, Niels Basjes <[email protected]> wrote: >>> >>>> Last week I noticed something similar in the solution I'm trying to >>>> build. >>>> I have a topology that simulates webtraffic and puts those >>>> "measurements" in Kafka. >>>> That has been running for a few weeks now and with the retention in >>>> Kafka this is a few hundred million events available in Kafka. >>>> >>>> I then start a topology with the Kafka Spout that must read >>>> "everything" from the start. >>>> After a while it simply seems to stop working (I see a LOT of errors). >>>> It is not surprising that the spout is capable of feeding those >>>> events into the topology much faster than it can process. >>>> It seemed to me that the topology 'died' somewhere because of this >>>> 'overload'. >>>> >>>> I'm thinking "too many events in the topology" >>>> >>>> Simply "increase parallelism" would only delay the effect a while. >>>> >>>> What is the general pattern for me to actually solve this kind of >>>> situation? >>>> Is there a way to let the KafkaSpout "slow down" before the overload? >>>> >>>> >>>> On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung <[email protected]> >>>> wrote: >>>> >>>>> definitely increase parallelism if possible. If you have 1 API bolt >>>>> per spout, and it takes avg 1 second / api request, and your max spout >>>>> pending is 1000, then it would take 1000 seconds to drain a full spout >>>>> output queue, which would definitely be enough to timeout your tuples, and >>>>> cause any subsequent tuples to timeout as well, if you have 10 api bolts >>>>> per spout then it still takes 100s to drain a full spout output queue. So >>>>> care needs to be taken. >>>>> >>>>> My understanding is that kafka spout will automatically replay >>>>> failed tuples. Is it possible that you get into a situation where you >>>>> topology simply cannot keep up and you get into a replay loop? >>>>> >>>>> On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin <[email protected]> >>>>> wrote: >>>>> >>>>>> Set max pending spout to 2-3K . Try with these values. >>>>>> To me it sounds also that you should try to increase the parallelism >>>>>> . Home many executors do you have for Api bolts? >>>>>> One more thing. Don't do too many changes at once. Try change by >>>>>> change otherwise you will get lost >>>>>> .-:) >>>>>> Vladi >>>>>> On 4 Nov 2014 00:49, "Maxime Nay" <[email protected]> wrote: >>>>>> >>>>>>> I have a default timeout on my HttpClient (10sc for socket and >>>>>>> 10sc for connect), and I'm not overriding this value anywhere. So I >>>>>>> guess >>>>>>> none of the API calls should be blocking. >>>>>>> I allocated 5GB of memory to each of my worker. I doubt the issue >>>>>>> is a GC issue. But just in case I will take a look at it. >>>>>>> What do you think would be a good value for the max pending spout? >>>>>>> I usually use 2 executors per type of spout. So 8 executors in total >>>>>>> for my >>>>>>> spouts. >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Maxime >>>>>>> >>>>>>> On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Yes, you probably fail because of timeouts. >>>>>>>> Check that none of your APIs is not blocking , make sure you have a >>>>>>>> timeout for all of them >>>>>>>> Check your GC, if you have many full GCs you should increase your >>>>>>>> Java heap >>>>>>>> Seems to me that you shouldn't put too high max pending spout. >>>>>>>> How many spouts (executors) do you have? >>>>>>>> Vladi >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Vladi, >>>>>>>>> >>>>>>>>> I will put log statements in each bolt. >>>>>>>>> The processing time per tuple is high due to a third party API >>>>>>>>> queried through http requests in one of our bolts. It can take up to 3 >>>>>>>>> seconds to get an answer from this service. >>>>>>>>> >>>>>>>>> I've tried multiple values for max pending spout. 400, 800, >>>>>>>>> 2000... It doesn't really seem to change anything. I'm also setting >>>>>>>>> messageTimeoutSecs to 25sc. >>>>>>>>> >>>>>>>>> I also noticed that at some point I'm getting failed tuples, even >>>>>>>>> though I'm never throwing any FailedException manually. So I guess >>>>>>>>> the only >>>>>>>>> way for a tuple to fail is to exceed the messageTimeoutSecs? >>>>>>>>> >>>>>>>>> Anyway, I restarted the topology and I will take a look at the >>>>>>>>> debug statements when it crashes again. >>>>>>>>> >>>>>>>>> Thanks for your help! >>>>>>>>> >>>>>>>>> >>>>>>>>> Maxime >>>>>>>>> >>>>>>>>> On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi >>>>>>>>>> We have the similar problem with v. 0.82. >>>>>>>>>> We suspect some slowest bolt in the topology hangs and this >>>>>>>>>> causes the entire topology being hanged. >>>>>>>>>> It can be database bolt for example. >>>>>>>>>> Put logging in each bolt enter and exit print out the bolt >>>>>>>>>> name,thread id and time. This will help you to find out which bolt >>>>>>>>>> hangs >>>>>>>>>> Few seconds proccesing per tuple sound too long. Maybe you should >>>>>>>>>> to profile your code as well >>>>>>>>>> What's your max pending spout value? >>>>>>>>>> Vladi >>>>>>>>>> On 31 Oct 2014 20:09, "Maxime Nay" <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> For some reason, after a few hours of processing, my topology >>>>>>>>>>> starts hanging. In the UI's 'Topology Stats' the emitted and >>>>>>>>>>> transferred >>>>>>>>>>> counts are equal to 0, and I can't see anything coming out of the >>>>>>>>>>> topology >>>>>>>>>>> (usually inserting in some database). >>>>>>>>>>> >>>>>>>>>>> I can't see anything unusual in the storm workers logs, nor in >>>>>>>>>>> kafka and zookeeper's logs. >>>>>>>>>>> The zkCoordinator keeps refreshing, but nothing happens : >>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted >>>>>>>>>>> partition managers: [] >>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New >>>>>>>>>>> partition managers: [] >>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished >>>>>>>>>>> refreshing >>>>>>>>>>> 2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read >>>>>>>>>>> partition info from zookeeper: GlobalPartitionInformation{... >>>>>>>>>>> >>>>>>>>>>> I don't really understand why this is hanging, and how I could >>>>>>>>>>> fix this. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and >>>>>>>>>>> storm-kafka 0.9.2-incubating. >>>>>>>>>>> >>>>>>>>>>> My topology pulls data from 4 different topics in Kafka, and >>>>>>>>>>> has 9 different bolts. Each bolt implements IBasicBolt. I'm not >>>>>>>>>>> doing any >>>>>>>>>>> acking manually (storm should take care of this for me, right?) >>>>>>>>>>> It takes a few second for a tuple to go through the entire >>>>>>>>>>> topology. >>>>>>>>>>> I'm setting a MaxSpoutPending to limit the number of tuples in >>>>>>>>>>> the topology. >>>>>>>>>>> My tuples shouldn't exceed the max size limit (set to default on >>>>>>>>>>> my kafka brokers and in my SpoutConfig. And I think the default is >>>>>>>>>>> rather >>>>>>>>>>> high and should easily handle a few lines of text) >>>>>>>>>>> The tuples don't necessarily go to each bolt. >>>>>>>>>>> >>>>>>>>>>> I'm defining my spouts like this: >>>>>>>>>>> ZkHosts zkHosts = new ZkHosts("zk1.example.com:2181", " >>>>>>>>>>> zk2.example.com:2181"...); >>>>>>>>>>> zkHosts.refreshFreqSecs = 120; >>>>>>>>>>> >>>>>>>>>>> SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(), >>>>>>>>>>> "TOPIC_NAME", >>>>>>>>>>> "/consumers", >>>>>>>>>>> "CONSUMER_ID"); >>>>>>>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new >>>>>>>>>>> StringScheme()); >>>>>>>>>>> KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig) >>>>>>>>>>> >>>>>>>>>>> I'm running this topology on 2 different workers, located on >>>>>>>>>>> two different supervisors. In total I'm using something like 160 >>>>>>>>>>> executors. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I would greatly appreciate any help or hints on how to >>>>>>>>>>> fix/investigate this problem! >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Maxime >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>> >>> >>
