I've already tried large queue sizes, specifically I've set it to 32768.

I'll also try to make the tasks more heavy weight as you suggest and see how things go.

On 25/07/2015 08:00 μμ, Enno Shioji wrote:
Forgot to note; re: queue sizes, in addition to what the article mentions you may also need to increase the MAX_PENDING config.

On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected] <mailto:[email protected]>> wrote:

    I see....

    A few suggestions:

     - Pump enough messages into Kafka that the entire test takes at
    least more than a few minutes
        - If the test is very short, results can often be misleading

     - Use sufficiently large queue sizes
        - Generally the lighter the tasks are, the longer queue you
    need to avoid worker starvation. Since your task looks very light
    weight, the default queue size may not be sufficient. There are a
    few places to tune. Have a look at this article:
    
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

     - Make the task a bit more heavy weight
        - Getting higher throughput through parallelising is generally
    hard when the tasks are very light weight. If you make each tasks
    a bit more heavy weight (e.g. by generating more random numbers
    per invocation), it may be easier to observe the performance
    increase (this way you may also be able to avoid the message
    buffer tuning thing mentioned above)

     - Try batched topologies
        - Rationale is similar to above because batching will
    effectively make tasks more "heavy weight".








    On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis
    <[email protected] <mailto:[email protected]>> wrote:

        Inside my bolts I ack every tuple I receive for processing.
        Furthermore, I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000
        to throttle how many tuples remain pending at a given moment.
        The bolts don't have any HashMap or other data structure. In
        fact, my code is pretty simple:

        public void execute(Tuple tuple) {
                if (!tuple.getString(0).contains("!")) {
                    Random ran = new Random();
                    int worker = ran.nextInt(boltNo) + 1;
                    List<Integer> l = _topo.getComponentTasks("worker"
        + worker);
                    String out = tuple.getString(0) + "!";
                    _collector.emitDirect(l.get(0), new Values(out));
                }
                else {
                    LOG.info("Already processed record: " +
        tuple.getString(0));
                }
                _collector.ack(tuple);
            }

        As far as the slowdown is concerned, the rate of data
        processing is constant from start to end. But when I increase
        the number of spouts/bolts, the system takes more time to
        process the same amount of records (500000 records to be
        exact). So, the throughput drops because the rate of data
        digestion is smaller. This is consistent as I increase the
        number of workers, I have tested till 20 spouts/ 20 bolts.

        I set the number of Kafka partitions exactly equal to the
        number of spouts each time to take advantage of the
        parallelism offered.

        I don't know if it helps, but I also tried a simpler topology,
        where bolts are not connected with each other (so it's
        basically a tree like topology) and I observed the same drop
        in throughput but not to the same extent. The increase in
        processing time is almost negligible.


        On 25/07/2015 07:08 μμ, Enno Shioji wrote:
        My first guess would be that there is something with the
        topology, like number of pending messages increasing
        abnormally due to tuples not being acked/too many tuples
        generated, JVM heap shortage caused on workers due to
        something being retained in bolts (like an ever growing
        HashMap) etc.

        Does the slowdown happen gradually, or is it immediately slow?

        Another random guess is that you don't have enough number of
        Kafka partitions, hence not using your new workers but I
        wouldn't expect this to make things significantly slower.



        On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis
        <[email protected] <mailto:[email protected]>> wrote:

            Morgan,

            I hardly think that this could be the problem. The
            topology is deployed over a 14 node cluster with 56 total
            cores and 96GB RAM. So when I jump from 8 to 16 workers,
            I think I am still far below my hardware limitations.


            On 25/07/2015 06:45 μμ, Morgan W09 wrote:
            It could be possible that you're reaching a hardware
            limitation. The jump from 8 to 16 total bolt/workers
            could be more than you hardware can handle efficiently.
            So it's starting to have to switch out processes and
            their memory, which can have substantial overhead
            causing your program to slow down.



            On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis
            <[email protected] <mailto:[email protected]>> wrote:

                Yes, it listens to its own output. For example, if I
                have two bolts (bolt1 and bolt2), I perform the
                following:

                bolt1.directGrouping("bolt1");
                bolt1.directGrouping("bolt2");
                bolt2.directGrouping("bolt1");
                bolt2.directGrouping("bolt2");

                I know that this could possibly lead to a cycle, but
                right now the bolts I'm trying to run perform the
                following:
                if the inputRecord doesn't contain a "!" {
                    append a "!"
                    emit to a random node
                }
                else {
                    do nothing with the record
                }

                Dimitris


                On 25/07/2015 06:03 μμ, Enno Shioji wrote:
                > Each bolt is connected with itself as well as
                with each one of the other bolts
                You mean the bolt listens to its own output?





                On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis
                <[email protected]
                <mailto:[email protected]>> wrote:

                    Hi all,

                    I'm trying to run a topology in Storm and I am
                    facing some scalability issues. Specifically, I
                    have a topology where KafkaSpouts read from a
                    Kafka queue and emit messages to bolts which
                    are connected with each other through
                    directGrouping. (Each bolt is connected with
                    itself as well as with each one of the other
                    bolts). Spouts subscribe to bolts with
                    shuffleGrouping. I observe that when I increase
                    the number of spouts and bolts proportionally,
                    I don't get the speedup I'm expecting to. In
                    fact, my topology seems to run slower and for
                    the same amount of data, it takes more time to
                    complete. For example, when I increase spouts
                    from 4->8 and bolts from 4->8, it takes longer
                    to process the same amount of kafka messages.

                    Any ideas why this is happening? Thanks in advance.

                    Best,
                    Dimitris Sarlis










Reply via email to