I searched briefly and couldn't find how to attach/get the timestamp from a
tuple emitted by a KafkaSpout.
As suggested by Sam I will add the timestamp in a bolt right after the
spout and see if it's enough to have something stable. I will keep you
posted on how it goes.

Thanks!

Maxime

On Tue, Nov 4, 2014 at 2:30 PM, Nathan Leung <[email protected]> wrote:

> If possible it is preferable to timestamp in the spout. If you emit when
> the spout output queue is nearly full you will have to wait for it to drain
> to the tuple you emitted before that tuple will be sent to the timestamp
> bolt which could substantially skew the difference between the timestamp
> and the actual emit time.
> On Nov 4, 2014 5:17 PM, "Sam Mati" <[email protected]> wrote:
>
>>  Maxime,
>>
>>  I'm glad I could help.  This issue with Storm not ignoring timed-out
>> tuples caught me by surprise as well, and it's not well documented, nor is
>> it expected.
>>
>>  Adding a timestamp should be easy.  I've not used KafkaSpout before so
>> I don't if it supports it, but you can always create your own Bolt
>> (immediately after the Spout) that simply tacks on a field called
>> "timestamp-ms" -- use (new Date()).getTime().  Then, in each bolt, you do
>> something like this:
>>
>>          Date expire_date = new
>> Date(input.getLongByField("timestamp-ms") + this.tuple_timeout_s*1000);
>>         if ((new Date()).after(expire_date)){
>>             logger.warn("Skipping tuple because it is too old: {}",
>> input.getValues());
>>             return;
>>         }
>>
>>  The problem here is ensuring all of your workers have clocks
>> synchronized with the Spout, otherwise you'll run into trouble.
>>
>>  Regarding choosing a max_pending value:
>>
>>  Your topology's bottleneck is definitely your Bolt that does the API
>> call... so max-pending needn't be set so high.. just enough to have a
>> buffer in each executor large enough to last the time it takes for more
>> tuples to be emitted from the spout to the worker.  Consider what happens
>> with a low value:  with max_pending equal to the number of executors, your
>> executors will execute and then wait for the time it takes the Spout to
>> grab a tuple and emit it to the executor (probably on the order of a few
>> ms).  A few ms of inefficiency isn't that bad!  Now, if you set max_pending
>> to 2x that, you should be just fine… your Bolts will always have something
>> in their buffer to execute, so they'll never be waiting on the Spout.
>>
>>  The above math may change with different numbers of spouts and
>> configurations, but you get the idea.
>>
>>  Best,
>> -Sam
>>
>>   From: Maxime Nay <[email protected]>
>> Reply-To: "[email protected]" <[email protected]>
>> Date: Tuesday, November 4, 2014 1:49 PM
>> To: "[email protected]" <[email protected]>
>> Subject: Re: KafkaSpout stops pulling data after a few hours
>>
>>      Hi,
>>
>>  Thanks Vladi, Sam and Nathan for your advices.
>>  The problem here is that the third party API we're using can only
>> tolerate X concurrent requests. So the parallelism hint of my API bolt is
>> limited.
>> When I get a spike in my traffic, it's okay for my topology to start
>> lagging as long as it can recover after that. That's why I set the max
>> spout pending originally: so my spouts do not flood my bolts by trying to
>> keep up with the increased traffic. I also need to handle cases when the
>> API is having issue and crashes or hangs. If the API is unavailable for a
>> few minutes, my topology should be able to recover.
>>
>>  I was not aware of what Sam mentioned (a failed tuple is processed
>> anyway). It fits perfectly with what I was observing.
>>  Sam, do you know how I can configure my KafkaSpouts to attach a
>> timestamp to my tuples?
>>
>>  Vladi, I have 120 executors for my API bolt, and 8 spouts in total. Each
>> API take on average 2.5sc, and it can take maybe one more second for the
>> tuple to go through the rest of the topology. My calls to the API are
>> configured with a 6sc timeout.
>>  I guess it means worse case scenario (API hanging), I can still process
>> 17 tuples per second. If I set the maxSpoutPending to 1k, since I have 8
>> spouts, it means I can buffer up to 8k tuples. Hence, I should set the
>> message timeout to something > 480sc.
>> Am I right?
>>
>>  Thanks for your help!
>>
>>
>>  Maxime
>>
>> On Tue, Nov 4, 2014 at 8:28 AM, Vladi Feigin <[email protected]> wrote:
>>
>>> <<Is there a way to let the KafkaSpout "slow down" before the overload?>>
>>>  Yes. This mechanism is called : max_spout _pending  parameter . This
>>> is the way you can control the tuples volumes a spout feeds into a topology
>>>
>>> On Tue, Nov 4, 2014 at 5:27 PM, Niels Basjes <[email protected]> wrote:
>>>
>>>> Last week I noticed something similar in the solution I'm trying to
>>>> build.
>>>> I have a topology that simulates webtraffic and puts those
>>>> "measurements" in Kafka.
>>>> That has been running for a few weeks now and with the retention in
>>>> Kafka this is a few hundred million events available in Kafka.
>>>>
>>>>  I then start a topology with the Kafka Spout that must read
>>>> "everything" from the start.
>>>> After a while it simply seems to stop working (I see a LOT of errors).
>>>>  It is not surprising that the spout is capable of feeding those
>>>> events into the topology much faster than it can process.
>>>> It seemed to me that the topology 'died' somewhere because of this
>>>> 'overload'.
>>>>
>>>>  I'm thinking "too many events in the topology"
>>>>
>>>>  Simply "increase parallelism" would only delay the effect a while.
>>>>
>>>>  What is the general pattern for me to actually solve this kind of
>>>> situation?
>>>>  Is there a way to let the KafkaSpout "slow down" before the overload?
>>>>
>>>>
>>>> On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung <[email protected]>
>>>> wrote:
>>>>
>>>>> definitely increase parallelism if possible.  If you have 1 API bolt
>>>>> per spout, and it takes avg 1 second / api request, and your max spout
>>>>> pending is 1000, then it would take 1000 seconds to drain a full spout
>>>>> output queue, which would definitely be enough to timeout your tuples, and
>>>>> cause any subsequent tuples to timeout as well,  if you have 10 api bolts
>>>>> per spout then it still takes 100s to drain a full spout output queue.  So
>>>>> care needs to be taken.
>>>>>
>>>>>  My understanding is that kafka spout will automatically replay
>>>>> failed tuples.  Is it possible that you get into a situation where you
>>>>> topology simply cannot keep up and you get into a replay loop?
>>>>>
>>>>> On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Set max pending spout to 2-3K . Try with these values.
>>>>>> To me it sounds also that you should try to increase the parallelism
>>>>>> . Home many executors do you have for Api bolts?
>>>>>> One more thing. Don't do too many changes at once. Try change by
>>>>>> change otherwise you will get lost
>>>>>> .-:)
>>>>>> Vladi
>>>>>>   On 4 Nov 2014 00:49, "Maxime Nay" <[email protected]> wrote:
>>>>>>
>>>>>>>   I have a default timeout on my HttpClient (10sc for socket and
>>>>>>> 10sc for connect), and I'm not overriding this value anywhere. So I 
>>>>>>> guess
>>>>>>> none of the API calls should be blocking.
>>>>>>>  I allocated 5GB of memory to each of my worker. I doubt the issue
>>>>>>> is a GC issue. But just in case I will take a look at it.
>>>>>>>  What do you think would be a good value for the max pending spout?
>>>>>>> I usually use 2 executors per type of spout. So 8 executors in total 
>>>>>>> for my
>>>>>>> spouts.
>>>>>>>
>>>>>>>  Thanks!
>>>>>>>
>>>>>>>  Maxime
>>>>>>>
>>>>>>> On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>  Yes, you probably fail because of timeouts.
>>>>>>>> Check that none of your APIs is not blocking , make sure you have a
>>>>>>>> timeout for all of them
>>>>>>>> Check your GC, if you have many full GCs you should increase your
>>>>>>>> Java heap
>>>>>>>> Seems to me that you shouldn't put too high max pending spout.
>>>>>>>> How many spouts (executors) do you have?
>>>>>>>>  Vladi
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>    Hi Vladi,
>>>>>>>>>
>>>>>>>>>  I will put log statements in each bolt.
>>>>>>>>>  The processing time per tuple is high due to a third party API
>>>>>>>>> queried through http requests in one of our bolts. It can take up to 3
>>>>>>>>> seconds to get an answer from this service.
>>>>>>>>>
>>>>>>>>>  I've tried multiple values for max pending spout. 400, 800,
>>>>>>>>> 2000... It doesn't really seem to change anything. I'm also setting
>>>>>>>>> messageTimeoutSecs to 25sc.
>>>>>>>>>
>>>>>>>>>  I also noticed that at some point I'm getting failed tuples, even
>>>>>>>>> though I'm never throwing any FailedException manually. So I guess 
>>>>>>>>> the only
>>>>>>>>> way for a tuple to fail is to exceed the messageTimeoutSecs?
>>>>>>>>>
>>>>>>>>>  Anyway, I restarted the topology and I will take a look at the
>>>>>>>>> debug statements when it crashes again.
>>>>>>>>>
>>>>>>>>>  Thanks for your help!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  Maxime
>>>>>>>>>
>>>>>>>>> On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>> We have the similar problem with v. 0.82.
>>>>>>>>>> We suspect some slowest bolt in the topology hangs and this
>>>>>>>>>> causes the entire topology being hanged.
>>>>>>>>>> It can be database bolt for example.
>>>>>>>>>> Put logging in each bolt enter and exit print out the bolt
>>>>>>>>>> name,thread id and time. This will help you to find out which bolt 
>>>>>>>>>> hangs
>>>>>>>>>> Few seconds proccesing per tuple sound too long. Maybe you should
>>>>>>>>>> to profile your code as well
>>>>>>>>>> What's your max pending spout value?
>>>>>>>>>> Vladi
>>>>>>>>>>   On 31 Oct 2014 20:09, "Maxime Nay" <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>>  Hi,
>>>>>>>>>>>
>>>>>>>>>>>  For some reason, after a few hours of processing, my topology
>>>>>>>>>>> starts hanging. In the UI's 'Topology Stats' the emitted and 
>>>>>>>>>>> transferred
>>>>>>>>>>> counts are equal to 0, and I can't see anything coming out of the 
>>>>>>>>>>> topology
>>>>>>>>>>> (usually inserting in some database).
>>>>>>>>>>>
>>>>>>>>>>>  I can't see anything unusual in the storm workers logs, nor in
>>>>>>>>>>> kafka and zookeeper's logs.
>>>>>>>>>>>  The zkCoordinator keeps refreshing, but nothing happens :
>>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted
>>>>>>>>>>> partition managers: []
>>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New
>>>>>>>>>>> partition managers: []
>>>>>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished
>>>>>>>>>>> refreshing
>>>>>>>>>>> 2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read
>>>>>>>>>>> partition info from zookeeper: GlobalPartitionInformation{...
>>>>>>>>>>>
>>>>>>>>>>>  I don't really understand why this is hanging, and how I could
>>>>>>>>>>> fix this.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and
>>>>>>>>>>> storm-kafka 0.9.2-incubating.
>>>>>>>>>>>
>>>>>>>>>>>  My topology pulls data from 4 different topics in Kafka, and
>>>>>>>>>>> has 9 different bolts. Each bolt implements IBasicBolt. I'm not 
>>>>>>>>>>> doing any
>>>>>>>>>>> acking manually (storm should take care of this for me, right?)
>>>>>>>>>>> It takes a few second for a tuple to go through the entire
>>>>>>>>>>> topology.
>>>>>>>>>>> I'm setting a MaxSpoutPending to limit the number of tuples in
>>>>>>>>>>> the topology.
>>>>>>>>>>> My tuples shouldn't exceed the max size limit (set to default on
>>>>>>>>>>> my kafka brokers and in my SpoutConfig. And I think the default is 
>>>>>>>>>>> rather
>>>>>>>>>>> high and should easily handle a few lines of text)
>>>>>>>>>>> The tuples don't necessarily go to each bolt.
>>>>>>>>>>>
>>>>>>>>>>>  I'm defining my spouts like this:
>>>>>>>>>>>         ZkHosts zkHosts = new ZkHosts("zk1.example.com:2181", "
>>>>>>>>>>> zk2.example.com:2181"...);
>>>>>>>>>>>         zkHosts.refreshFreqSecs = 120;
>>>>>>>>>>>
>>>>>>>>>>>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(),
>>>>>>>>>>>                 "TOPIC_NAME",
>>>>>>>>>>>                  "/consumers",
>>>>>>>>>>>                  "CONSUMER_ID");
>>>>>>>>>>>          kafkaConfig.scheme = new SchemeAsMultiScheme(new
>>>>>>>>>>> StringScheme());
>>>>>>>>>>>         KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig)
>>>>>>>>>>>
>>>>>>>>>>>  I'm running this topology on 2 different workers, located on
>>>>>>>>>>> two different supervisors. In total I'm using something like 160 
>>>>>>>>>>> executors.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  I would greatly appreciate any help or hints on how to
>>>>>>>>>>> fix/investigate this problem!
>>>>>>>>>>>
>>>>>>>>>>>  Thanks,
>>>>>>>>>>>   Maxime
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>>>   --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>

Reply via email to