Last week I noticed something similar in the solution I'm trying to build.
I have a topology that simulates webtraffic and puts those "measurements"
in Kafka.
That has been running for a few weeks now and with the retention in Kafka
this is a few hundred million events available in Kafka.

I then start a topology with the Kafka Spout that must read "everything"
from the start.
After a while it simply seems to stop working (I see a LOT of errors).
It is not surprising that the spout is capable of feeding those events into
the topology much faster than it can process.
It seemed to me that the topology 'died' somewhere because of this
'overload'.

I'm thinking "too many events in the topology"

Simply "increase parallelism" would only delay the effect a while.

What is the general pattern for me to actually solve this kind of situation?
Is there a way to let the KafkaSpout "slow down" before the overload?


On Tue, Nov 4, 2014 at 12:48 PM, Nathan Leung <[email protected]> wrote:

> definitely increase parallelism if possible.  If you have 1 API bolt per
> spout, and it takes avg 1 second / api request, and your max spout pending
> is 1000, then it would take 1000 seconds to drain a full spout output
> queue, which would definitely be enough to timeout your tuples, and cause
> any subsequent tuples to timeout as well,  if you have 10 api bolts per
> spout then it still takes 100s to drain a full spout output queue.  So care
> needs to be taken.
>
> My understanding is that kafka spout will automatically replay failed
> tuples.  Is it possible that you get into a situation where you topology
> simply cannot keep up and you get into a replay loop?
>
> On Mon, Nov 3, 2014 at 11:49 PM, Vladi Feigin <[email protected]> wrote:
>
>> Set max pending spout to 2-3K . Try with these values.
>> To me it sounds also that you should try to increase the parallelism .
>> Home many executors do you have for Api bolts?
>> One more thing. Don't do too many changes at once. Try change by change
>> otherwise you will get lost
>> .-:)
>> Vladi
>>  On 4 Nov 2014 00:49, "Maxime Nay" <[email protected]> wrote:
>>
>>> I have a default timeout on my HttpClient (10sc for socket and 10sc for
>>> connect), and I'm not overriding this value anywhere. So I guess none of
>>> the API calls should be blocking.
>>> I allocated 5GB of memory to each of my worker. I doubt the issue is a
>>> GC issue. But just in case I will take a look at it.
>>> What do you think would be a good value for the max pending spout? I
>>> usually use 2 executors per type of spout. So 8 executors in total for my
>>> spouts.
>>>
>>> Thanks!
>>>
>>> Maxime
>>>
>>> On Mon, Nov 3, 2014 at 12:41 PM, Vladi Feigin <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Yes, you probably fail because of timeouts.
>>>> Check that none of your APIs is not blocking , make sure you have a
>>>> timeout for all of them
>>>> Check your GC, if you have many full GCs you should increase your Java
>>>> heap
>>>> Seems to me that you shouldn't put too high max pending spout.
>>>> How many spouts (executors) do you have?
>>>> Vladi
>>>>
>>>>
>>>>
>>>> On Mon, Nov 3, 2014 at 10:20 PM, Maxime Nay <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Vladi,
>>>>>
>>>>> I will put log statements in each bolt.
>>>>> The processing time per tuple is high due to a third party API queried
>>>>> through http requests in one of our bolts. It can take up to 3 seconds to
>>>>> get an answer from this service.
>>>>>
>>>>> I've tried multiple values for max pending spout. 400, 800, 2000... It
>>>>> doesn't really seem to change anything. I'm also setting 
>>>>> messageTimeoutSecs
>>>>> to 25sc.
>>>>>
>>>>> I also noticed that at some point I'm getting failed tuples, even
>>>>> though I'm never throwing any FailedException manually. So I guess the 
>>>>> only
>>>>> way for a tuple to fail is to exceed the messageTimeoutSecs?
>>>>>
>>>>> Anyway, I restarted the topology and I will take a look at the debug
>>>>> statements when it crashes again.
>>>>>
>>>>> Thanks for your help!
>>>>>
>>>>>
>>>>> Maxime
>>>>>
>>>>> On Sat, Nov 1, 2014 at 9:49 PM, Vladi Feigin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>> We have the similar problem with v. 0.82.
>>>>>> We suspect some slowest bolt in the topology hangs and this causes
>>>>>> the entire topology being hanged.
>>>>>> It can be database bolt for example.
>>>>>> Put logging in each bolt enter and exit print out the bolt
>>>>>> name,thread id and time. This will help you to find out which bolt hangs
>>>>>> Few seconds proccesing per tuple sound too long. Maybe you should to
>>>>>> profile your code as well
>>>>>> What's your max pending spout value?
>>>>>> Vladi
>>>>>>  On 31 Oct 2014 20:09, "Maxime Nay" <[email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> For some reason, after a few hours of processing, my topology starts
>>>>>>> hanging. In the UI's 'Topology Stats' the emitted and transferred counts
>>>>>>> are equal to 0, and I can't see anything coming out of the topology
>>>>>>> (usually inserting in some database).
>>>>>>>
>>>>>>> I can't see anything unusual in the storm workers logs, nor in kafka
>>>>>>> and zookeeper's logs.
>>>>>>> The zkCoordinator keeps refreshing, but nothing happens :
>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Deleted
>>>>>>> partition managers: []
>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] New
>>>>>>> partition managers: []
>>>>>>> 2014-10-31 17:00:13 s.k.ZkCoordinator [INFO] Task [2/2] Finished
>>>>>>> refreshing
>>>>>>> 2014-10-31 17:00:13 s.k.DynamicBrokersReader [INFO] Read partition
>>>>>>> info from zookeeper: GlobalPartitionInformation{...
>>>>>>>
>>>>>>> I don't really understand why this is hanging, and how I could fix
>>>>>>> this.
>>>>>>>
>>>>>>>
>>>>>>> I'm using storm 0.9.2-incubating with Kafka 0.8.1.1 and storm-kafka
>>>>>>> 0.9.2-incubating.
>>>>>>>
>>>>>>> My topology pulls data from 4 different topics in Kafka, and has 9
>>>>>>> different bolts. Each bolt implements IBasicBolt. I'm not doing any 
>>>>>>> acking
>>>>>>> manually (storm should take care of this for me, right?)
>>>>>>> It takes a few second for a tuple to go through the entire topology.
>>>>>>> I'm setting a MaxSpoutPending to limit the number of tuples in the
>>>>>>> topology.
>>>>>>> My tuples shouldn't exceed the max size limit (set to default on my
>>>>>>> kafka brokers and in my SpoutConfig. And I think the default is rather 
>>>>>>> high
>>>>>>> and should easily handle a few lines of text)
>>>>>>> The tuples don't necessarily go to each bolt.
>>>>>>>
>>>>>>> I'm defining my spouts like this:
>>>>>>>         ZkHosts zkHosts = new ZkHosts("zk1.example.com:2181", "
>>>>>>> zk2.example.com:2181"...);
>>>>>>>         zkHosts.refreshFreqSecs = 120;
>>>>>>>
>>>>>>>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts(),
>>>>>>>                 "TOPIC_NAME",
>>>>>>>                 "/consumers",
>>>>>>>                 "CONSUMER_ID");
>>>>>>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new
>>>>>>> StringScheme());
>>>>>>>         KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig)
>>>>>>>
>>>>>>> I'm running this topology on 2 different workers, located on two
>>>>>>> different supervisors. In total I'm using something like 160 executors.
>>>>>>>
>>>>>>>
>>>>>>> I would greatly appreciate any help or hints on how to
>>>>>>> fix/investigate this problem!
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Maxime
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to