Dimitris,

If I understand correctly, the bolt does nothing if it receives a tuple it
has processed. So instead of randomly emitting the tuple, you could as well
make sure that the bolt does not emit the tuple to itself. You can assert
this by comparing the randomly selected component ID with its own ID which
you can extract in the prepare() call from storm context.

This way you will considerably reduce the number of tuples that flow
through the system even as you increase the workers.
Let me know if this works/fares better in your tests.


On Sun, Jul 26, 2015 at 4:51 PM, Enno Shioji <[email protected]> wrote:

> Hmmmm
>
> The number looks quite similar, so I wonder if there is a common
> bottleneck. For example, it could be that your Kafka servers are the
> limiting factor (although I'd doubt this, at least if the Storm task's take
> sufficient time as then comparatively the load on Kafka will be smaller).
>
> Another place to look would be the output (in your case log file?). But if
> you are increasing the number of machines in the Storm cluster, that
> doesn't explain it either as you'll have more disks.
>
> Resource contention can also be a factor; e.g. if you are using VMs and
> the physical machines have already tons of other VMs allocated, or
> something wrong with the network etc. then the results may be confusing.
>
> One thing to look out for when you fake a heavy weight task is JVM's dead
> code elimination. Depending on how you do it, the compiler can optimise the
> entire work away. It may be worth to see if your tasks are actually taking
> significant time to solve. A common trick to avoid this is to have this in
> your code:
>
> if (result_of_fake_work == very_unlikely):
>     System.out.println("")
>
> If the task is taking something like 1 sec that would definitely be long
> enough.
>
>
> Otherwise I'd be examining the logs / console to see if there's some
> problem with the cluster. I've seen cases where supervisors report into
> nimbus but can't communicate to each other, in effect creating "dud"
> workers that makes the topology work very slow.
>
>
>
>
> On Sun, Jul 26, 2015 at 11:16 AM, Dimitris Sarlis <[email protected]>
> wrote:
>
>>  I tried increasing the number of Kafka messages and made the tasks more
>> heavy weight. As a result the total experiment took around 13-14 minutes to
>> complete. I also increased the number of bolts not by creating different
>> ones but by changing the parallelism hint (e.g. 16 bolts vs 4 bolts with 4
>> parallelism hint). That helped a bit and at least increasing the number of
>> total workers is not slower. But still the system's scalability is not
>> good. For example, with 4 spouts/ 4 bolts I get ~14min, 8/8 give ~14min,
>> 16/16 give ~12min, 20/20 give ~13min.
>>
>> Any ideas what else I could check and experiment with?
>>
>> On 25/07/2015 08:46 μμ, Enno Shioji wrote:
>>
>> Oh and one last thing.. You may want to consider keeping a counter in the
>> bolt and only log.INFO every 10K or 100K times or something, as writing to
>> log files could become an artificial bottleneck.
>>
>> On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji <[email protected]> wrote:
>>
>>> I see.
>>>
>>>  One thing, you prob. want to make the else branch ("Already processed
>>> record") heavy weight, too as that branch would be *extremely* light
>>> weight.
>>>
>>> On Sat, Jul 25, 2015 at 6:33 PM, Dimitris Sarlis <[email protected]>
>>> wrote:
>>>
>>>>  I've already tried large queue sizes, specifically I've set it to
>>>> 32768.
>>>>
>>>> I'll also try to make the tasks more heavy weight as you suggest and
>>>> see how things go.
>>>>
>>>>
>>>> On 25/07/2015 08:00 μμ, Enno Shioji wrote:
>>>>
>>>> Forgot to note; re: queue sizes, in addition to what the article
>>>> mentions you may also need to increase the MAX_PENDING config.
>>>>
>>>> On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected]> wrote:
>>>>
>>>>> I see....
>>>>>
>>>>>  A few suggestions:
>>>>>
>>>>>   - Pump enough messages into Kafka that the entire test takes at
>>>>> least more than a few minutes
>>>>>     - If the test is very short, results can often be misleading
>>>>>
>>>>>   - Use sufficiently large queue sizes
>>>>>     - Generally the lighter the tasks are, the longer queue you need
>>>>> to avoid worker starvation. Since your task looks very light weight, the
>>>>> default queue size may not be sufficient. There are a few places to tune.
>>>>> Have a look at this article:
>>>>> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
>>>>>
>>>>>   - Make the task a bit more heavy weight
>>>>>     - Getting higher throughput through parallelising is generally
>>>>> hard when the tasks are very light weight. If you make each tasks a bit
>>>>> more heavy weight (e.g. by generating more random numbers per invocation),
>>>>> it may be easier to observe the performance increase (this way you may 
>>>>> also
>>>>> be able to avoid the message buffer tuning thing mentioned above)
>>>>>
>>>>>   - Try batched topologies
>>>>>     - Rationale is similar to above because batching will effectively
>>>>> make tasks more "heavy weight".
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis <[email protected]
>>>>> > wrote:
>>>>>
>>>>>>  Inside my bolts I ack every tuple I receive for processing.
>>>>>> Furthermore, I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle
>>>>>> how many tuples remain pending at a given moment. The bolts don't have 
>>>>>> any
>>>>>> HashMap or other data structure. In fact, my code is pretty simple:
>>>>>>
>>>>>> public void execute(Tuple tuple) {
>>>>>>         if (!tuple.getString(0).contains("!")) {
>>>>>>             Random ran = new Random();
>>>>>>             int worker = ran.nextInt(boltNo) + 1;
>>>>>>             List<Integer> l = _topo.getComponentTasks("worker" +
>>>>>> worker);
>>>>>>             String out = tuple.getString(0) + "!";
>>>>>>             _collector.emitDirect(l.get(0), new Values(out));
>>>>>>         }
>>>>>>         else {
>>>>>>             LOG.info("Already processed record: " +
>>>>>> tuple.getString(0));
>>>>>>         }
>>>>>>         _collector.ack(tuple);
>>>>>>     }
>>>>>>
>>>>>> As far as the slowdown is concerned, the rate of data processing is
>>>>>> constant from start to end. But when I increase the number of 
>>>>>> spouts/bolts,
>>>>>> the system takes more time to process the same amount of records (500000
>>>>>> records to be exact). So, the throughput drops because the rate of data
>>>>>> digestion is smaller. This is consistent as I increase the number of
>>>>>> workers, I have tested till 20 spouts/ 20 bolts.
>>>>>>
>>>>>> I set the number of Kafka partitions exactly equal to the number of
>>>>>> spouts each time to take advantage of the parallelism offered.
>>>>>>
>>>>>> I don't know if it helps, but I also tried a simpler topology, where
>>>>>> bolts are not connected with each other (so it's basically a tree like
>>>>>> topology) and I observed the same drop in throughput but not to the same
>>>>>> extent. The increase in processing time is almost negligible.
>>>>>>
>>>>>>
>>>>>> On 25/07/2015 07:08 μμ, Enno Shioji wrote:
>>>>>>
>>>>>>  My first guess would be that there is something with the topology,
>>>>>> like number of pending messages increasing abnormally due to tuples not
>>>>>> being acked/too many tuples generated, JVM heap shortage caused on 
>>>>>> workers
>>>>>> due to something being retained in bolts (like an ever growing HashMap) 
>>>>>> etc.
>>>>>>
>>>>>>  Does the slowdown happen gradually, or is it immediately slow?
>>>>>>
>>>>>>  Another random guess is that you don't have enough number of Kafka
>>>>>> partitions, hence not using your new workers but I wouldn't expect this 
>>>>>> to
>>>>>> make things significantly slower.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>  Morgan,
>>>>>>>
>>>>>>> I hardly think that this could be the problem. The topology is
>>>>>>> deployed over a 14 node cluster with 56 total cores and 96GB RAM. So 
>>>>>>> when I
>>>>>>> jump from 8 to 16 workers, I think I am still far below my hardware
>>>>>>> limitations.
>>>>>>>
>>>>>>>
>>>>>>> On 25/07/2015 06:45 μμ, Morgan W09 wrote:
>>>>>>>
>>>>>>> It could be possible that you're reaching a hardware limitation. The
>>>>>>> jump from 8 to 16 total bolt/workers could be more than you hardware can
>>>>>>> handle efficiently. So it's starting to have to switch out processes and
>>>>>>> their memory, which can have substantial overhead causing your program 
>>>>>>> to
>>>>>>> slow down.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>  Yes, it listens to its own output. For example, if I have two
>>>>>>>> bolts (bolt1 and bolt2), I perform the following:
>>>>>>>>
>>>>>>>> bolt1.directGrouping("bolt1");
>>>>>>>> bolt1.directGrouping("bolt2");
>>>>>>>> bolt2.directGrouping("bolt1");
>>>>>>>> bolt2.directGrouping("bolt2");
>>>>>>>>
>>>>>>>> I know that this could possibly lead to a cycle, but right now the
>>>>>>>> bolts I'm trying to run perform the following:
>>>>>>>> if the inputRecord doesn't contain a "!" {
>>>>>>>>     append a "!"
>>>>>>>>     emit to a random node
>>>>>>>> }
>>>>>>>> else {
>>>>>>>>     do nothing with the record
>>>>>>>> }
>>>>>>>>
>>>>>>>> Dimitris
>>>>>>>>
>>>>>>>>
>>>>>>>> On 25/07/2015 06:03 μμ, Enno Shioji wrote:
>>>>>>>>
>>>>>>>> > Each bolt is connected with itself as well as with each one of
>>>>>>>> the other bolts
>>>>>>>> You mean the bolt listens to its own output?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I'm trying to run a topology in Storm and I am facing some
>>>>>>>>> scalability issues. Specifically, I have a topology where KafkaSpouts 
>>>>>>>>> read
>>>>>>>>> from a Kafka queue and emit messages to bolts which are connected 
>>>>>>>>> with each
>>>>>>>>> other through directGrouping. (Each bolt is connected with itself as 
>>>>>>>>> well
>>>>>>>>> as with each one of the other bolts). Spouts subscribe to bolts with
>>>>>>>>> shuffleGrouping. I observe that when I increase the number of spouts 
>>>>>>>>> and
>>>>>>>>> bolts proportionally, I don't get the speedup I'm expecting to. In 
>>>>>>>>> fact, my
>>>>>>>>> topology seems to run slower and for the same amount of data, it 
>>>>>>>>> takes more
>>>>>>>>> time to complete. For example, when I increase spouts from 4->8 and 
>>>>>>>>> bolts
>>>>>>>>> from 4->8, it takes longer to process the same amount of kafka 
>>>>>>>>> messages.
>>>>>>>>>
>>>>>>>>> Any ideas why this is happening? Thanks in advance.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Dimitris Sarlis
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply via email to