@Enno:
I tried to increase the number of Kafka servers but it didn't help.
My current experiments are executed with disabled log messages, so this
does not affect the performance. Even if I wrote to logs, as you said it
shouldn't matter since I'm also increasing the number of machines in the
cluster.
I'll try to find out if any resource contention is a factor.
I've also checked that my tasks actually perform the heavy weight work I
give them. Specifically, as I put more weight I see that bolt capacity
as well as execute, process latency also increase, so the tasks are
actually performing the heavy work. I don't think there is any JVM code
elimination, as you suggested.
The logs for the nimbus and supervisors seem normal, I didn't came
across any errors or exceptions.
@Srinath:
I tested what you suggested but I get similar numbers as before.
On 26/07/2015 02:45 μμ, Srinath C wrote:
Dimitris,
If I understand correctly, the bolt does nothing if it receives a
tuple it has processed. So instead of randomly emitting the tuple, you
could as well make sure that the bolt does not emit the tuple to
itself. You can assert this by comparing the randomly selected
component ID with its own ID which you can extract in the prepare()
call from storm context.
This way you will considerably reduce the number of tuples that flow
through the system even as you increase the workers.
Let me know if this works/fares better in your tests.
On Sun, Jul 26, 2015 at 4:51 PM, Enno Shioji <[email protected]
<mailto:[email protected]>> wrote:
Hmmmm
The number looks quite similar, so I wonder if there is a common
bottleneck. For example, it could be that your Kafka servers are
the limiting factor (although I'd doubt this, at least if the
Storm task's take sufficient time as then comparatively the load
on Kafka will be smaller).
Another place to look would be the output (in your case log
file?). But if you are increasing the number of machines in the
Storm cluster, that doesn't explain it either as you'll have more
disks.
Resource contention can also be a factor; e.g. if you are using
VMs and the physical machines have already tons of other VMs
allocated, or something wrong with the network etc. then the
results may be confusing.
One thing to look out for when you fake a heavy weight task is
JVM's dead code elimination. Depending on how you do it, the
compiler can optimise the entire work away. It may be worth to see
if your tasks are actually taking significant time to solve. A
common trick to avoid this is to have this in your code:
if (result_of_fake_work == very_unlikely):
System.out.println("")
If the task is taking something like 1 sec that would definitely
be long enough.
Otherwise I'd be examining the logs / console to see if there's
some problem with the cluster. I've seen cases where supervisors
report into nimbus but can't communicate to each other, in effect
creating "dud" workers that makes the topology work very slow.
On Sun, Jul 26, 2015 at 11:16 AM, Dimitris Sarlis
<[email protected] <mailto:[email protected]>> wrote:
I tried increasing the number of Kafka messages and made the
tasks more heavy weight. As a result the total experiment took
around 13-14 minutes to complete. I also increased the number
of bolts not by creating different ones but by changing the
parallelism hint (e.g. 16 bolts vs 4 bolts with 4 parallelism
hint). That helped a bit and at least increasing the number of
total workers is not slower. But still the system's
scalability is not good. For example, with 4 spouts/ 4 bolts I
get ~14min, 8/8 give ~14min, 16/16 give ~12min, 20/20 give ~13min.
Any ideas what else I could check and experiment with?
On 25/07/2015 08:46 μμ, Enno Shioji wrote:
Oh and one last thing.. You may want to consider keeping a
counter in the bolt and only log.INFO every 10K or 100K times
or something, as writing to log files could become an
artificial bottleneck.
On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji
<[email protected] <mailto:[email protected]>> wrote:
I see.
One thing, you prob. want to make the else branch
("Already processed record") heavy weight, too as that
branch would be *extremely* light weight.
On Sat, Jul 25, 2015 at 6:33 PM, Dimitris Sarlis
<[email protected] <mailto:[email protected]>> wrote:
I've already tried large queue sizes, specifically
I've set it to 32768.
I'll also try to make the tasks more heavy weight as
you suggest and see how things go.
On 25/07/2015 08:00 μμ, Enno Shioji wrote:
Forgot to note; re: queue sizes, in addition to what
the article mentions you may also need to increase
the MAX_PENDING config.
On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji
<[email protected] <mailto:[email protected]>> wrote:
I see....
A few suggestions:
- Pump enough messages into Kafka that the
entire test takes at least more than a few minutes
- If the test is very short, results can
often be misleading
- Use sufficiently large queue sizes
- Generally the lighter the tasks are, the
longer queue you need to avoid worker
starvation. Since your task looks very light
weight, the default queue size may not be
sufficient. There are a few places to tune. Have
a look at this article:
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
- Make the task a bit more heavy weight
- Getting higher throughput through
parallelising is generally hard when the tasks
are very light weight. If you make each tasks a
bit more heavy weight (e.g. by generating more
random numbers per invocation), it may be easier
to observe the performance increase (this way
you may also be able to avoid the message buffer
tuning thing mentioned above)
- Try batched topologies
- Rationale is similar to above because
batching will effectively make tasks more "heavy
weight".
On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis
<[email protected]
<mailto:[email protected]>> wrote:
Inside my bolts I ack every tuple I receive
for processing. Furthermore, I've set the
TOPOLOGY_MAX_SPOUT_PENDING to 1000 to
throttle how many tuples remain pending at a
given moment. The bolts don't have any
HashMap or other data structure. In fact, my
code is pretty simple:
public void execute(Tuple tuple) {
if (!tuple.getString(0).contains("!")) {
Random ran = new Random();
int worker = ran.nextInt(boltNo) + 1;
List<Integer> l =
_topo.getComponentTasks("worker" + worker);
String out = tuple.getString(0) + "!";
_collector.emitDirect(l.get(0), new
Values(out));
}
else {
LOG.info("Already processed record: " +
tuple.getString(0));
}
_collector.ack(tuple);
}
As far as the slowdown is concerned, the
rate of data processing is constant from
start to end. But when I increase the number
of spouts/bolts, the system takes more time
to process the same amount of records
(500000 records to be exact). So, the
throughput drops because the rate of data
digestion is smaller. This is consistent as
I increase the number of workers, I have
tested till 20 spouts/ 20 bolts.
I set the number of Kafka partitions exactly
equal to the number of spouts each time to
take advantage of the parallelism offered.
I don't know if it helps, but I also tried a
simpler topology, where bolts are not
connected with each other (so it's basically
a tree like topology) and I observed the
same drop in throughput but not to the same
extent. The increase in processing time is
almost negligible.
On 25/07/2015 07:08 μμ, Enno Shioji wrote:
My first guess would be that there is
something with the topology, like number of
pending messages increasing abnormally due
to tuples not being acked/too many tuples
generated, JVM heap shortage caused on
workers due to something being retained in
bolts (like an ever growing HashMap) etc.
Does the slowdown happen gradually, or is
it immediately slow?
Another random guess is that you don't have
enough number of Kafka partitions, hence
not using your new workers but I wouldn't
expect this to make things significantly
slower.
On Sat, Jul 25, 2015 at 4:48 PM, Dimitris
Sarlis <[email protected]
<mailto:[email protected]>> wrote:
Morgan,
I hardly think that this could be the
problem. The topology is deployed over
a 14 node cluster with 56 total cores
and 96GB RAM. So when I jump from 8 to
16 workers, I think I am still far
below my hardware limitations.
On 25/07/2015 06:45 μμ, Morgan W09 wrote:
It could be possible that you're
reaching a hardware limitation. The
jump from 8 to 16 total bolt/workers
could be more than you hardware can
handle efficiently. So it's starting
to have to switch out processes and
their memory, which can have
substantial overhead causing your
program to slow down.
On Sat, Jul 25, 2015 at 10:36 AM,
Dimitris Sarlis <[email protected]
<mailto:[email protected]>> wrote:
Yes, it listens to its own output.
For example, if I have two bolts
(bolt1 and bolt2), I perform the
following:
bolt1.directGrouping("bolt1");
bolt1.directGrouping("bolt2");
bolt2.directGrouping("bolt1");
bolt2.directGrouping("bolt2");
I know that this could possibly
lead to a cycle, but right now the
bolts I'm trying to run perform
the following:
if the inputRecord doesn't contain
a "!" {
append a "!"
emit to a random node
}
else {
do nothing with the record
}
Dimitris
On 25/07/2015 06:03 μμ, Enno
Shioji wrote:
> Each bolt is connected with
itself as well as with each one
of the other bolts
You mean the bolt listens to its
own output?
On Sat, Jul 25, 2015 at 1:29 PM,
Dimitris Sarlis
<[email protected]
<mailto:[email protected]>> wrote:
Hi all,
I'm trying to run a topology
in Storm and I am facing some
scalability issues.
Specifically, I have a
topology where KafkaSpouts
read from a Kafka queue and
emit messages to bolts which
are connected with each other
through directGrouping. (Each
bolt is connected with itself
as well as with each one of
the other bolts). Spouts
subscribe to bolts with
shuffleGrouping. I observe
that when I increase the
number of spouts and bolts
proportionally, I don't get
the speedup I'm expecting to.
In fact, my topology seems to
run slower and for the same
amount of data, it takes more
time to complete. For
example, when I increase
spouts from 4->8 and bolts
from 4->8, it takes longer to
process the same amount of
kafka messages.
Any ideas why this is
happening? Thanks in advance.
Best,
Dimitris Sarlis