I tried increasing the number of Kafka messages and made the tasks more heavy weight. As a result the total experiment took around 13-14 minutes to complete. I also increased the number of bolts not by creating different ones but by changing the parallelism hint (e.g. 16 bolts vs 4 bolts with 4 parallelism hint). That helped a bit and at least increasing the number of total workers is not slower. But still the system's scalability is not good. For example, with 4 spouts/ 4 bolts I get ~14min, 8/8 give ~14min, 16/16 give ~12min, 20/20 give ~13min.

Any ideas what else I could check and experiment with?

On 25/07/2015 08:46 μμ, Enno Shioji wrote:
Oh and one last thing.. You may want to consider keeping a counter in the bolt and only log.INFO every 10K or 100K times or something, as writing to log files could become an artificial bottleneck.

On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji <[email protected] <mailto:[email protected]>> wrote:

    I see.

    One thing, you prob. want to make the else branch ("Already
    processed record") heavy weight, too as that branch would be
    *extremely* light weight.

    On Sat, Jul 25, 2015 at 6:33 PM, Dimitris Sarlis
    <[email protected] <mailto:[email protected]>> wrote:

        I've already tried large queue sizes, specifically I've set it
        to 32768.

        I'll also try to make the tasks more heavy weight as you
        suggest and see how things go.


        On 25/07/2015 08:00 μμ, Enno Shioji wrote:
        Forgot to note; re: queue sizes, in addition to what the
        article mentions you may also need to increase the
        MAX_PENDING config.

        On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji
        <[email protected] <mailto:[email protected]>> wrote:

            I see....

            A few suggestions:

             - Pump enough messages into Kafka that the entire test
            takes at least more than a few minutes
                - If the test is very short, results can often be
            misleading

             - Use sufficiently large queue sizes
                - Generally the lighter the tasks are, the longer
            queue you need to avoid worker starvation. Since your
            task looks very light weight, the default queue size may
            not be sufficient. There are a few places to tune. Have a
            look at this article:
            
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

             - Make the task a bit more heavy weight
                - Getting higher throughput through parallelising is
            generally hard when the tasks are very light weight. If
            you make each tasks a bit more heavy weight (e.g. by
            generating more random numbers per invocation), it may be
            easier to observe the performance increase (this way you
            may also be able to avoid the message buffer tuning thing
            mentioned above)

             - Try batched topologies
                - Rationale is similar to above because batching will
            effectively make tasks more "heavy weight".








            On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis
            <[email protected] <mailto:[email protected]>> wrote:

                Inside my bolts I ack every tuple I receive for
                processing. Furthermore, I've set the
                TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle how
                many tuples remain pending at a given moment. The
                bolts don't have any HashMap or other data structure.
                In fact, my code is pretty simple:

                public void execute(Tuple tuple) {
                        if (!tuple.getString(0).contains("!")) {
                            Random ran = new Random();
                            int worker = ran.nextInt(boltNo) + 1;
                List<Integer> l = _topo.getComponentTasks("worker" +
                worker);
                            String out = tuple.getString(0) + "!";
                _collector.emitDirect(l.get(0), new Values(out));
                        }
                        else {
                LOG.info("Already processed record: " +
                tuple.getString(0));
                        }
                _collector.ack(tuple);
                    }

                As far as the slowdown is concerned, the rate of data
                processing is constant from start to end. But when I
                increase the number of spouts/bolts, the system takes
                more time to process the same amount of records
                (500000 records to be exact). So, the throughput
                drops because the rate of data digestion is smaller.
                This is consistent as I increase the number of
                workers, I have tested till 20 spouts/ 20 bolts.

                I set the number of Kafka partitions exactly equal to
                the number of spouts each time to take advantage of
                the parallelism offered.

                I don't know if it helps, but I also tried a simpler
                topology, where bolts are not connected with each
                other (so it's basically a tree like topology) and I
                observed the same drop in throughput but not to the
                same extent. The increase in processing time is
                almost negligible.


                On 25/07/2015 07:08 μμ, Enno Shioji wrote:
                My first guess would be that there is something with
                the topology, like number of pending messages
                increasing abnormally due to tuples not being
                acked/too many tuples generated, JVM heap shortage
                caused on workers due to something being retained in
                bolts (like an ever growing HashMap) etc.

                Does the slowdown happen gradually, or is it
                immediately slow?

                Another random guess is that you don't have enough
                number of Kafka partitions, hence not using your new
                workers but I wouldn't expect this to make things
                significantly slower.



                On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis
                <[email protected] <mailto:[email protected]>>
                wrote:

                    Morgan,

                    I hardly think that this could be the problem.
                    The topology is deployed over a 14 node cluster
                    with 56 total cores and 96GB RAM. So when I jump
                    from 8 to 16 workers, I think I am still far
                    below my hardware limitations.


                    On 25/07/2015 06:45 μμ, Morgan W09 wrote:
                    It could be possible that you're reaching a
                    hardware limitation. The jump from 8 to 16
                    total bolt/workers could be more than you
                    hardware can handle efficiently. So it's
                    starting to have to switch out processes and
                    their memory, which can have substantial
                    overhead causing your program to slow down.



                    On Sat, Jul 25, 2015 at 10:36 AM, Dimitris
                    Sarlis <[email protected]
                    <mailto:[email protected]>> wrote:

                        Yes, it listens to its own output. For
                        example, if I have two bolts (bolt1 and
                        bolt2), I perform the following:

                        bolt1.directGrouping("bolt1");
                        bolt1.directGrouping("bolt2");
                        bolt2.directGrouping("bolt1");
                        bolt2.directGrouping("bolt2");

                        I know that this could possibly lead to a
                        cycle, but right now the bolts I'm trying
                        to run perform the following:
                        if the inputRecord doesn't contain a "!" {
                            append a "!"
                            emit to a random node
                        }
                        else {
                            do nothing with the record
                        }

                        Dimitris


                        On 25/07/2015 06:03 μμ, Enno Shioji wrote:
                        > Each bolt is connected with itself as
                        well as with each one of the other bolts
                        You mean the bolt listens to its own output?





                        On Sat, Jul 25, 2015 at 1:29 PM, Dimitris
                        Sarlis <[email protected]
                        <mailto:[email protected]>> wrote:

                            Hi all,

                            I'm trying to run a topology in Storm
                            and I am facing some scalability
                            issues. Specifically, I have a
                            topology where KafkaSpouts read from a
                            Kafka queue and emit messages to bolts
                            which are connected with each other
                            through directGrouping. (Each bolt is
                            connected with itself as well as with
                            each one of the other bolts). Spouts
                            subscribe to bolts with
                            shuffleGrouping. I observe that when I
                            increase the number of spouts and
                            bolts proportionally, I don't get the
                            speedup I'm expecting to. In fact, my
                            topology seems to run slower and for
                            the same amount of data, it takes more
                            time to complete. For example, when I
                            increase spouts from 4->8 and bolts
                            from 4->8, it takes longer to process
                            the same amount of kafka messages.

                            Any ideas why this is happening?
                            Thanks in advance.

                            Best,
                            Dimitris Sarlis













Reply via email to