Hi guys,
retrying here after no luck in SO (I'm not sure if this is a question about Storm or Kafka, or both).
However:

After developing and executing my Storm (1.0.1) topology with a KafkaSpout and a couple of Bolts, I noticed a huge network traffic even when the topology is idle (no message on Kafka, no processing is done in bolts). So I started to comment out my topology piece by piece in order to find the cause and I ended with only the KafkaSpout in my main:
....
final SpoutConfig spoutConfig = new SpoutConfig(
                new ZkHosts(zkHosts, "/brokers"),
                "files-topic", // topic
                "/kafka", // ZK chroot
                "consumer-group-name");
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
     spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
     topologyBuilder.setSpout(
                        "kafka-spout-id,
                        new KafkaSpout(config),
                        1);
....
When this (useless) topology executes, even in local mode, even the very first time, the network traffic always grows a lot: I see (in my Activity Monitor)

 * an average of 432 KB of data received/sec
 * moreless the same average of data sent/sec
 * After a couple of hours the topology is running (idle) data received
   is 1.26GB and data sent is 1GB

(Important: Kafka - 0.10 - is not running in cluster, it is a single instance that runs in the same machine with a single topic and a single partition. I just downloaded Kafka on my machine, started it and created a simple topic. When I put a message in the topic, everything in the topology is working without any problem at all)

Obviously, the reason is in the KafkaSpout.nextTuple() method (below), but I don't understand why, without any message in Kafka, I should have such traffic. Is there something I didn't consider? Is that the expected behaviour? I had a look at Kafka logs, ZK logs, Storm logs, nothing, I have cleaned up Kafka and ZK data, nothing, still the same behaviour.

@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
    for (int i = 0; i < managers.size(); i++) {

        try {
            // in case the number of managers decreased
            _currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
            if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
            }
            if (state != EmitState.NO_EMITTED) {
                break;
            }
        } catch (FailedFetchException e) {
            LOG.warn("Fetch failed", e);
            _coordinator.refresh();
        }
    }

    long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;

    /*
As far as the System.currentTimeMillis() is dependent on System clock, additional check on negative value of diffWithNow in case of external changes.
     */
if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
        commit();
    }
}


Many thanks in advance
Andrea

Reply via email to