I see.
One thing, you prob. want to make the else branch ("Already processed
record") heavy weight, too as that branch would be *extremely* light
weight.
On Sat, Jul 25, 2015 at 6:33 PM, Dimitris Sarlis <[email protected]>
wrote:
> I've already tried large queue sizes, specifically I've set it to 32768.
>
> I'll also try to make the tasks more heavy weight as you suggest and see
> how things go.
>
>
> On 25/07/2015 08:00 μμ, Enno Shioji wrote:
>
> Forgot to note; re: queue sizes, in addition to what the article mentions
> you may also need to increase the MAX_PENDING config.
>
> On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected]> wrote:
>
>> I see....
>>
>> A few suggestions:
>>
>> - Pump enough messages into Kafka that the entire test takes at least
>> more than a few minutes
>> - If the test is very short, results can often be misleading
>>
>> - Use sufficiently large queue sizes
>> - Generally the lighter the tasks are, the longer queue you need to
>> avoid worker starvation. Since your task looks very light weight, the
>> default queue size may not be sufficient. There are a few places to tune.
>> Have a look at this article:
>> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
>>
>> - Make the task a bit more heavy weight
>> - Getting higher throughput through parallelising is generally hard
>> when the tasks are very light weight. If you make each tasks a bit more
>> heavy weight (e.g. by generating more random numbers per invocation), it
>> may be easier to observe the performance increase (this way you may also be
>> able to avoid the message buffer tuning thing mentioned above)
>>
>> - Try batched topologies
>> - Rationale is similar to above because batching will effectively
>> make tasks more "heavy weight".
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis <[email protected]>
>> wrote:
>>
>>> Inside my bolts I ack every tuple I receive for processing.
>>> Furthermore, I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle
>>> how many tuples remain pending at a given moment. The bolts don't have any
>>> HashMap or other data structure. In fact, my code is pretty simple:
>>>
>>> public void execute(Tuple tuple) {
>>> if (!tuple.getString(0).contains("!")) {
>>> Random ran = new Random();
>>> int worker = ran.nextInt(boltNo) + 1;
>>> List<Integer> l = _topo.getComponentTasks("worker" + worker);
>>> String out = tuple.getString(0) + "!";
>>> _collector.emitDirect(l.get(0), new Values(out));
>>> }
>>> else {
>>> LOG.info("Already processed record: " + tuple.getString(0));
>>> }
>>> _collector.ack(tuple);
>>> }
>>>
>>> As far as the slowdown is concerned, the rate of data processing is
>>> constant from start to end. But when I increase the number of spouts/bolts,
>>> the system takes more time to process the same amount of records (500000
>>> records to be exact). So, the throughput drops because the rate of data
>>> digestion is smaller. This is consistent as I increase the number of
>>> workers, I have tested till 20 spouts/ 20 bolts.
>>>
>>> I set the number of Kafka partitions exactly equal to the number of
>>> spouts each time to take advantage of the parallelism offered.
>>>
>>> I don't know if it helps, but I also tried a simpler topology, where
>>> bolts are not connected with each other (so it's basically a tree like
>>> topology) and I observed the same drop in throughput but not to the same
>>> extent. The increase in processing time is almost negligible.
>>>
>>>
>>> On 25/07/2015 07:08 μμ, Enno Shioji wrote:
>>>
>>> My first guess would be that there is something with the topology,
>>> like number of pending messages increasing abnormally due to tuples not
>>> being acked/too many tuples generated, JVM heap shortage caused on workers
>>> due to something being retained in bolts (like an ever growing HashMap) etc.
>>>
>>> Does the slowdown happen gradually, or is it immediately slow?
>>>
>>> Another random guess is that you don't have enough number of Kafka
>>> partitions, hence not using your new workers but I wouldn't expect this to
>>> make things significantly slower.
>>>
>>>
>>>
>>> On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis <[email protected]>
>>> wrote:
>>>
>>>> Morgan,
>>>>
>>>> I hardly think that this could be the problem. The topology is deployed
>>>> over a 14 node cluster with 56 total cores and 96GB RAM. So when I jump
>>>> from 8 to 16 workers, I think I am still far below my hardware limitations.
>>>>
>>>>
>>>> On 25/07/2015 06:45 μμ, Morgan W09 wrote:
>>>>
>>>> It could be possible that you're reaching a hardware limitation. The
>>>> jump from 8 to 16 total bolt/workers could be more than you hardware can
>>>> handle efficiently. So it's starting to have to switch out processes and
>>>> their memory, which can have substantial overhead causing your program to
>>>> slow down.
>>>>
>>>>
>>>>
>>>> On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis <[email protected]
>>>> > wrote:
>>>>
>>>>> Yes, it listens to its own output. For example, if I have two bolts
>>>>> (bolt1 and bolt2), I perform the following:
>>>>>
>>>>> bolt1.directGrouping("bolt1");
>>>>> bolt1.directGrouping("bolt2");
>>>>> bolt2.directGrouping("bolt1");
>>>>> bolt2.directGrouping("bolt2");
>>>>>
>>>>> I know that this could possibly lead to a cycle, but right now the
>>>>> bolts I'm trying to run perform the following:
>>>>> if the inputRecord doesn't contain a "!" {
>>>>> append a "!"
>>>>> emit to a random node
>>>>> }
>>>>> else {
>>>>> do nothing with the record
>>>>> }
>>>>>
>>>>> Dimitris
>>>>>
>>>>>
>>>>> On 25/07/2015 06:03 μμ, Enno Shioji wrote:
>>>>>
>>>>> > Each bolt is connected with itself as well as with each one of the
>>>>> other bolts
>>>>> You mean the bolt listens to its own output?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm trying to run a topology in Storm and I am facing some
>>>>>> scalability issues. Specifically, I have a topology where KafkaSpouts
>>>>>> read
>>>>>> from a Kafka queue and emit messages to bolts which are connected with
>>>>>> each
>>>>>> other through directGrouping. (Each bolt is connected with itself as well
>>>>>> as with each one of the other bolts). Spouts subscribe to bolts with
>>>>>> shuffleGrouping. I observe that when I increase the number of spouts and
>>>>>> bolts proportionally, I don't get the speedup I'm expecting to. In fact,
>>>>>> my
>>>>>> topology seems to run slower and for the same amount of data, it takes
>>>>>> more
>>>>>> time to complete. For example, when I increase spouts from 4->8 and bolts
>>>>>> from 4->8, it takes longer to process the same amount of kafka messages.
>>>>>>
>>>>>> Any ideas why this is happening? Thanks in advance.
>>>>>>
>>>>>> Best,
>>>>>> Dimitris Sarlis
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>
>