I tried increasing the number of Kafka messages and made the tasks more
heavy weight. As a result the total experiment took around 13-14 minutes
to complete. I also increased the number of bolts not by creating
different ones but by changing the parallelism hint (e.g. 16 bolts vs 4
bolts with 4 parallelism hint). That helped a bit and at least
increasing the number of total workers is not slower. But still the
system's scalability is not good. For example, with 4 spouts/ 4 bolts I
get ~14min, 8/8 give ~14min, 16/16 give ~12min, 20/20 give ~13min.
Any ideas what else I could check and experiment with?
On 25/07/2015 08:46 μμ, Enno Shioji wrote:
Oh and one last thing.. You may want to consider keeping a counter in
the bolt and only log.INFO every 10K or 100K times or something, as
writing to log files could become an artificial bottleneck.
On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji <[email protected]
<mailto:[email protected]>> wrote:
I see.
One thing, you prob. want to make the else branch ("Already
processed record") heavy weight, too as that branch would be
*extremely* light weight.
On Sat, Jul 25, 2015 at 6:33 PM, Dimitris Sarlis
<[email protected] <mailto:[email protected]>> wrote:
I've already tried large queue sizes, specifically I've set it
to 32768.
I'll also try to make the tasks more heavy weight as you
suggest and see how things go.
On 25/07/2015 08:00 μμ, Enno Shioji wrote:
Forgot to note; re: queue sizes, in addition to what the
article mentions you may also need to increase the
MAX_PENDING config.
On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji
<[email protected] <mailto:[email protected]>> wrote:
I see....
A few suggestions:
- Pump enough messages into Kafka that the entire test
takes at least more than a few minutes
- If the test is very short, results can often be
misleading
- Use sufficiently large queue sizes
- Generally the lighter the tasks are, the longer
queue you need to avoid worker starvation. Since your
task looks very light weight, the default queue size may
not be sufficient. There are a few places to tune. Have a
look at this article:
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
- Make the task a bit more heavy weight
- Getting higher throughput through parallelising is
generally hard when the tasks are very light weight. If
you make each tasks a bit more heavy weight (e.g. by
generating more random numbers per invocation), it may be
easier to observe the performance increase (this way you
may also be able to avoid the message buffer tuning thing
mentioned above)
- Try batched topologies
- Rationale is similar to above because batching will
effectively make tasks more "heavy weight".
On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis
<[email protected] <mailto:[email protected]>> wrote:
Inside my bolts I ack every tuple I receive for
processing. Furthermore, I've set the
TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle how
many tuples remain pending at a given moment. The
bolts don't have any HashMap or other data structure.
In fact, my code is pretty simple:
public void execute(Tuple tuple) {
if (!tuple.getString(0).contains("!")) {
Random ran = new Random();
int worker = ran.nextInt(boltNo) + 1;
List<Integer> l = _topo.getComponentTasks("worker" +
worker);
String out = tuple.getString(0) + "!";
_collector.emitDirect(l.get(0), new Values(out));
}
else {
LOG.info("Already processed record: " +
tuple.getString(0));
}
_collector.ack(tuple);
}
As far as the slowdown is concerned, the rate of data
processing is constant from start to end. But when I
increase the number of spouts/bolts, the system takes
more time to process the same amount of records
(500000 records to be exact). So, the throughput
drops because the rate of data digestion is smaller.
This is consistent as I increase the number of
workers, I have tested till 20 spouts/ 20 bolts.
I set the number of Kafka partitions exactly equal to
the number of spouts each time to take advantage of
the parallelism offered.
I don't know if it helps, but I also tried a simpler
topology, where bolts are not connected with each
other (so it's basically a tree like topology) and I
observed the same drop in throughput but not to the
same extent. The increase in processing time is
almost negligible.
On 25/07/2015 07:08 μμ, Enno Shioji wrote:
My first guess would be that there is something with
the topology, like number of pending messages
increasing abnormally due to tuples not being
acked/too many tuples generated, JVM heap shortage
caused on workers due to something being retained in
bolts (like an ever growing HashMap) etc.
Does the slowdown happen gradually, or is it
immediately slow?
Another random guess is that you don't have enough
number of Kafka partitions, hence not using your new
workers but I wouldn't expect this to make things
significantly slower.
On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis
<[email protected] <mailto:[email protected]>>
wrote:
Morgan,
I hardly think that this could be the problem.
The topology is deployed over a 14 node cluster
with 56 total cores and 96GB RAM. So when I jump
from 8 to 16 workers, I think I am still far
below my hardware limitations.
On 25/07/2015 06:45 μμ, Morgan W09 wrote:
It could be possible that you're reaching a
hardware limitation. The jump from 8 to 16
total bolt/workers could be more than you
hardware can handle efficiently. So it's
starting to have to switch out processes and
their memory, which can have substantial
overhead causing your program to slow down.
On Sat, Jul 25, 2015 at 10:36 AM, Dimitris
Sarlis <[email protected]
<mailto:[email protected]>> wrote:
Yes, it listens to its own output. For
example, if I have two bolts (bolt1 and
bolt2), I perform the following:
bolt1.directGrouping("bolt1");
bolt1.directGrouping("bolt2");
bolt2.directGrouping("bolt1");
bolt2.directGrouping("bolt2");
I know that this could possibly lead to a
cycle, but right now the bolts I'm trying
to run perform the following:
if the inputRecord doesn't contain a "!" {
append a "!"
emit to a random node
}
else {
do nothing with the record
}
Dimitris
On 25/07/2015 06:03 μμ, Enno Shioji wrote:
> Each bolt is connected with itself as
well as with each one of the other bolts
You mean the bolt listens to its own output?
On Sat, Jul 25, 2015 at 1:29 PM, Dimitris
Sarlis <[email protected]
<mailto:[email protected]>> wrote:
Hi all,
I'm trying to run a topology in Storm
and I am facing some scalability
issues. Specifically, I have a
topology where KafkaSpouts read from a
Kafka queue and emit messages to bolts
which are connected with each other
through directGrouping. (Each bolt is
connected with itself as well as with
each one of the other bolts). Spouts
subscribe to bolts with
shuffleGrouping. I observe that when I
increase the number of spouts and
bolts proportionally, I don't get the
speedup I'm expecting to. In fact, my
topology seems to run slower and for
the same amount of data, it takes more
time to complete. For example, when I
increase spouts from 4->8 and bolts
from 4->8, it takes longer to process
the same amount of kafka messages.
Any ideas why this is happening?
Thanks in advance.
Best,
Dimitris Sarlis