@Enno:
I tried to increase the number of Kafka servers but it didn't help.

My current experiments are executed with disabled log messages, so this does not affect the performance. Even if I wrote to logs, as you said it shouldn't matter since I'm also increasing the number of machines in the cluster.

I'll try to find out if any resource contention is a factor.

I've also checked that my tasks actually perform the heavy weight work I give them. Specifically, as I put more weight I see that bolt capacity as well as execute, process latency also increase, so the tasks are actually performing the heavy work. I don't think there is any JVM code elimination, as you suggested.

The logs for the nimbus and supervisors seem normal, I didn't came across any errors or exceptions.

@Srinath:
I tested what you suggested but I get similar numbers as before.



On 26/07/2015 02:45 μμ, Srinath C wrote:
Dimitris,

If I understand correctly, the bolt does nothing if it receives a tuple it has processed. So instead of randomly emitting the tuple, you could as well make sure that the bolt does not emit the tuple to itself. You can assert this by comparing the randomly selected component ID with its own ID which you can extract in the prepare() call from storm context.

This way you will considerably reduce the number of tuples that flow through the system even as you increase the workers.
Let me know if this works/fares better in your tests.


On Sun, Jul 26, 2015 at 4:51 PM, Enno Shioji <[email protected] <mailto:[email protected]>> wrote:

    Hmmmm

    The number looks quite similar, so I wonder if there is a common
    bottleneck. For example, it could be that your Kafka servers are
    the limiting factor (although I'd doubt this, at least if the
    Storm task's take sufficient time as then comparatively the load
    on Kafka will be smaller).

    Another place to look would be the output (in your case log
    file?). But if you are increasing the number of machines in the
    Storm cluster, that doesn't explain it either as you'll have more
    disks.

    Resource contention can also be a factor; e.g. if you are using
    VMs and the physical machines have already tons of other VMs
    allocated, or something wrong with the network etc. then the
    results may be confusing.

    One thing to look out for when you fake a heavy weight task is
    JVM's dead code elimination. Depending on how you do it, the
    compiler can optimise the entire work away. It may be worth to see
    if your tasks are actually taking significant time to solve. A
    common trick to avoid this is to have this in your code:

    if (result_of_fake_work == very_unlikely):
        System.out.println("")

    If the task is taking something like 1 sec that would definitely
    be long enough.


    Otherwise I'd be examining the logs / console to see if there's
    some problem with the cluster. I've seen cases where supervisors
    report into nimbus but can't communicate to each other, in effect
    creating "dud" workers that makes the topology work very slow.




    On Sun, Jul 26, 2015 at 11:16 AM, Dimitris Sarlis
    <[email protected] <mailto:[email protected]>> wrote:

        I tried increasing the number of Kafka messages and made the
        tasks more heavy weight. As a result the total experiment took
        around 13-14 minutes to complete. I also increased the number
        of bolts not by creating different ones but by changing the
        parallelism hint (e.g. 16 bolts vs 4 bolts with 4 parallelism
        hint). That helped a bit and at least increasing the number of
        total workers is not slower. But still the system's
        scalability is not good. For example, with 4 spouts/ 4 bolts I
        get ~14min, 8/8 give ~14min, 16/16 give ~12min, 20/20 give ~13min.

        Any ideas what else I could check and experiment with?

        On 25/07/2015 08:46 μμ, Enno Shioji wrote:
        Oh and one last thing.. You may want to consider keeping a
        counter in the bolt and only log.INFO every 10K or 100K times
        or something, as writing to log files could become an
        artificial bottleneck.

        On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji
        <[email protected] <mailto:[email protected]>> wrote:

            I see.

            One thing, you prob. want to make the else branch
            ("Already processed record") heavy weight, too as that
            branch would be *extremely* light weight.

            On Sat, Jul 25, 2015 at 6:33 PM, Dimitris Sarlis
            <[email protected] <mailto:[email protected]>> wrote:

                I've already tried large queue sizes, specifically
                I've set it to 32768.

                I'll also try to make the tasks more heavy weight as
                you suggest and see how things go.


                On 25/07/2015 08:00 μμ, Enno Shioji wrote:
                Forgot to note; re: queue sizes, in addition to what
                the article mentions you may also need to increase
                the MAX_PENDING config.

                On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji
                <[email protected] <mailto:[email protected]>> wrote:

                    I see....

                    A few suggestions:

                     - Pump enough messages into Kafka that the
                    entire test takes at least more than a few minutes
                        - If the test is very short, results can
                    often be misleading

                     - Use sufficiently large queue sizes
                        - Generally the lighter the tasks are, the
                    longer queue you need to avoid worker
                    starvation. Since your task looks very light
                    weight, the default queue size may not be
                    sufficient. There are a few places to tune. Have
                    a look at this article:
                    
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

                     - Make the task a bit more heavy weight
                        - Getting higher throughput through
                    parallelising is generally hard when the tasks
                    are very light weight. If you make each tasks a
                    bit more heavy weight (e.g. by generating more
                    random numbers per invocation), it may be easier
                    to observe the performance increase (this way
                    you may also be able to avoid the message buffer
                    tuning thing mentioned above)

                     - Try batched topologies
                        - Rationale is similar to above because
                    batching will effectively make tasks more "heavy
                    weight".








                    On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis
                    <[email protected]
                    <mailto:[email protected]>> wrote:

                        Inside my bolts I ack every tuple I receive
                        for processing. Furthermore, I've set the
                        TOPOLOGY_MAX_SPOUT_PENDING to 1000 to
                        throttle how many tuples remain pending at a
                        given moment. The bolts don't have any
                        HashMap or other data structure. In fact, my
                        code is pretty simple:

                        public void execute(Tuple tuple) {
                                if (!tuple.getString(0).contains("!")) {
                        Random ran = new Random();
                        int worker = ran.nextInt(boltNo) + 1;
                        List<Integer> l =
                        _topo.getComponentTasks("worker" + worker);
                        String out = tuple.getString(0) + "!";
                        _collector.emitDirect(l.get(0), new
                        Values(out));
                                }
                                else {
                        LOG.info("Already processed record: " +
                        tuple.getString(0));
                                }
                        _collector.ack(tuple);
                            }

                        As far as the slowdown is concerned, the
                        rate of data processing is constant from
                        start to end. But when I increase the number
                        of spouts/bolts, the system takes more time
                        to process the same amount of records
                        (500000 records to be exact). So, the
                        throughput drops because the rate of data
                        digestion is smaller. This is consistent as
                        I increase the number of workers, I have
                        tested till 20 spouts/ 20 bolts.

                        I set the number of Kafka partitions exactly
                        equal to the number of spouts each time to
                        take advantage of the parallelism offered.

                        I don't know if it helps, but I also tried a
                        simpler topology, where bolts are not
                        connected with each other (so it's basically
                        a tree like topology) and I observed the
                        same drop in throughput but not to the same
                        extent. The increase in processing time is
                        almost negligible.


                        On 25/07/2015 07:08 μμ, Enno Shioji wrote:
                        My first guess would be that there is
                        something with the topology, like number of
                        pending messages increasing abnormally due
                        to tuples not being acked/too many tuples
                        generated, JVM heap shortage caused on
                        workers due to something being retained in
                        bolts (like an ever growing HashMap) etc.

                        Does the slowdown happen gradually, or is
                        it immediately slow?

                        Another random guess is that you don't have
                        enough number of Kafka partitions, hence
                        not using your new workers but I wouldn't
                        expect this to make things significantly
                        slower.



                        On Sat, Jul 25, 2015 at 4:48 PM, Dimitris
                        Sarlis <[email protected]
                        <mailto:[email protected]>> wrote:

                            Morgan,

                            I hardly think that this could be the
                            problem. The topology is deployed over
                            a 14 node cluster with 56 total cores
                            and 96GB RAM. So when I jump from 8 to
                            16 workers, I think I am still far
                            below my hardware limitations.


                            On 25/07/2015 06:45 μμ, Morgan W09 wrote:
                            It could be possible that you're
                            reaching a hardware limitation. The
                            jump from 8 to 16 total bolt/workers
                            could be more than you hardware can
                            handle efficiently. So it's starting
                            to have to switch out processes and
                            their memory, which can have
                            substantial overhead causing your
                            program to slow down.



                            On Sat, Jul 25, 2015 at 10:36 AM,
                            Dimitris Sarlis <[email protected]
                            <mailto:[email protected]>> wrote:

                                Yes, it listens to its own output.
                                For example, if I have two bolts
                                (bolt1 and bolt2), I perform the
                                following:

                                bolt1.directGrouping("bolt1");
                                bolt1.directGrouping("bolt2");
                                bolt2.directGrouping("bolt1");
                                bolt2.directGrouping("bolt2");

                                I know that this could possibly
                                lead to a cycle, but right now the
                                bolts I'm trying to run perform
                                the following:
                                if the inputRecord doesn't contain
                                a "!" {
                                    append a "!"
                                    emit to a random node
                                }
                                else {
                                    do nothing with the record
                                }

                                Dimitris


                                On 25/07/2015 06:03 μμ, Enno
                                Shioji wrote:
                                > Each bolt is connected with
                                itself as well as with each one
                                of the other bolts
                                You mean the bolt listens to its
                                own output?





                                On Sat, Jul 25, 2015 at 1:29 PM,
                                Dimitris Sarlis
                                <[email protected]
                                <mailto:[email protected]>> wrote:

                                    Hi all,

                                    I'm trying to run a topology
                                    in Storm and I am facing some
                                    scalability issues.
                                    Specifically, I have a
                                    topology where KafkaSpouts
                                    read from a Kafka queue and
                                    emit messages to bolts which
                                    are connected with each other
                                    through directGrouping. (Each
                                    bolt is connected with itself
                                    as well as with each one of
                                    the other bolts). Spouts
                                    subscribe to bolts with
                                    shuffleGrouping. I observe
                                    that when I increase the
                                    number of spouts and bolts
                                    proportionally, I don't get
                                    the speedup I'm expecting to.
                                    In fact, my topology seems to
                                    run slower and for the same
                                    amount of data, it takes more
                                    time to complete. For
                                    example, when I increase
                                    spouts from 4->8 and bolts
                                    from 4->8, it takes longer to
                                    process the same amount of
                                    kafka messages.

                                    Any ideas why this is
                                    happening? Thanks in advance.

                                    Best,
                                    Dimitris Sarlis
















Reply via email to