Inside my bolts I ack every tuple I receive for processing. Furthermore,
I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle how many
tuples remain pending at a given moment. The bolts don't have any
HashMap or other data structure. In fact, my code is pretty simple:
public void execute(Tuple tuple) {
if (!tuple.getString(0).contains("!")) {
Random ran = new Random();
int worker = ran.nextInt(boltNo) + 1;
List<Integer> l = _topo.getComponentTasks("worker" + worker);
String out = tuple.getString(0) + "!";
_collector.emitDirect(l.get(0), new Values(out));
}
else {
LOG.info("Already processed record: " + tuple.getString(0));
}
_collector.ack(tuple);
}
As far as the slowdown is concerned, the rate of data processing is
constant from start to end. But when I increase the number of
spouts/bolts, the system takes more time to process the same amount of
records (500000 records to be exact). So, the throughput drops because
the rate of data digestion is smaller. This is consistent as I increase
the number of workers, I have tested till 20 spouts/ 20 bolts.
I set the number of Kafka partitions exactly equal to the number of
spouts each time to take advantage of the parallelism offered.
I don't know if it helps, but I also tried a simpler topology, where
bolts are not connected with each other (so it's basically a tree like
topology) and I observed the same drop in throughput but not to the same
extent. The increase in processing time is almost negligible.
On 25/07/2015 07:08 μμ, Enno Shioji wrote:
My first guess would be that there is something with the topology,
like number of pending messages increasing abnormally due to tuples
not being acked/too many tuples generated, JVM heap shortage caused on
workers due to something being retained in bolts (like an ever growing
HashMap) etc.
Does the slowdown happen gradually, or is it immediately slow?
Another random guess is that you don't have enough number of Kafka
partitions, hence not using your new workers but I wouldn't expect
this to make things significantly slower.
On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis <[email protected]
<mailto:[email protected]>> wrote:
Morgan,
I hardly think that this could be the problem. The topology is
deployed over a 14 node cluster with 56 total cores and 96GB RAM.
So when I jump from 8 to 16 workers, I think I am still far below
my hardware limitations.
On 25/07/2015 06:45 μμ, Morgan W09 wrote:
It could be possible that you're reaching a hardware limitation.
The jump from 8 to 16 total bolt/workers could be more than you
hardware can handle efficiently. So it's starting to have to
switch out processes and their memory, which can have substantial
overhead causing your program to slow down.
On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis
<[email protected] <mailto:[email protected]>> wrote:
Yes, it listens to its own output. For example, if I have two
bolts (bolt1 and bolt2), I perform the following:
bolt1.directGrouping("bolt1");
bolt1.directGrouping("bolt2");
bolt2.directGrouping("bolt1");
bolt2.directGrouping("bolt2");
I know that this could possibly lead to a cycle, but right
now the bolts I'm trying to run perform the following:
if the inputRecord doesn't contain a "!" {
append a "!"
emit to a random node
}
else {
do nothing with the record
}
Dimitris
On 25/07/2015 06:03 μμ, Enno Shioji wrote:
> Each bolt is connected with itself as well as with each
one of the other bolts
You mean the bolt listens to its own output?
On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis
<[email protected] <mailto:[email protected]>> wrote:
Hi all,
I'm trying to run a topology in Storm and I am facing
some scalability issues. Specifically, I have a topology
where KafkaSpouts read from a Kafka queue and emit
messages to bolts which are connected with each other
through directGrouping. (Each bolt is connected with
itself as well as with each one of the other bolts).
Spouts subscribe to bolts with shuffleGrouping. I
observe that when I increase the number of spouts and
bolts proportionally, I don't get the speedup I'm
expecting to. In fact, my topology seems to run slower
and for the same amount of data, it takes more time to
complete. For example, when I increase spouts from 4->8
and bolts from 4->8, it takes longer to process the same
amount of kafka messages.
Any ideas why this is happening? Thanks in advance.
Best,
Dimitris Sarlis