Hi guys,
retrying here after no luck in SO (I'm not sure if this is a question
about Storm or Kafka, or both).
However:
After developing and executing my Storm (1.0.1) topology with a
KafkaSpout and a couple of Bolts, I noticed a huge network traffic even
when the topology is idle (no message on Kafka, no processing is done in
bolts). So I started to comment out my topology piece by piece in order
to find the cause and I ended with only the KafkaSpout in my main:
....
final SpoutConfig spoutConfig = new SpoutConfig(
new ZkHosts(zkHosts, "/brokers"),
"files-topic", // topic
"/kafka", // ZK chroot
"consumer-group-name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
topologyBuilder.setSpout(
"kafka-spout-id,
new KafkaSpout(config),
1);
....
When this (useless) topology executes, even in local mode, even the very
first time, the network traffic always grows a lot: I see (in my
Activity Monitor)
* an average of 432 KB of data received/sec
* moreless the same average of data sent/sec
* After a couple of hours the topology is running (idle) data received
is 1.26GB and data sent is 1GB
(Important: Kafka - 0.10 - is not running in cluster, it is a single
instance that runs in the same machine with a single topic and a single
partition. I just downloaded Kafka on my machine, started it and created
a simple topic. When I put a message in the topic, everything in the
topology is working without any problem at all)
Obviously, the reason is in the KafkaSpout.nextTuple() method (below),
but I don't understand why, without any message in Kafka, I should have
such traffic. Is there something I didn't consider? Is that the expected
behaviour? I had a look at Kafka logs, ZK logs, Storm logs, nothing, I
have cleaned up Kafka and ZK data, nothing, still the same behaviour.
@Override
public void nextTuple() {
List<PartitionManager> managers =
_coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state =
managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) %
managers.size();
}
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
/*
As far as the System.currentTimeMillis() is dependent on
System clock,
additional check on negative value of diffWithNow in case of
external changes.
*/
if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow
< 0) {
commit();
}
}
Many thanks in advance
Andrea