You may want to check out https://issues.apache.org/jira/browse/STORM-1674
It is fixed in 1.0.2 On Fri, Aug 19, 2016 at 11:35 AM, Andrea Gazzarini <[email protected]> wrote: > Hi guys, > retrying here after no luck in SO (I'm not sure if this is a question > about Storm or Kafka, or both). > However: > > After developing and executing my Storm (1.0.1) topology with a KafkaSpout > and a couple of Bolts, I noticed a huge network traffic even when the > topology is idle (no message on Kafka, no processing is done in bolts). So > I started to comment out my topology piece by piece in order to find the > cause and I ended with only the KafkaSpout in my main: > .... > final SpoutConfig spoutConfig = new SpoutConfig( > new ZkHosts(zkHosts, "/brokers"), > "files-topic", // topic > "/kafka", // ZK chroot > "consumer-group-name"); > spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > spoutConfig.startOffsetTime = OffsetRequest.LatestTime(); > topologyBuilder.setSpout( > "kafka-spout-id, > new KafkaSpout(config), > 1); > .... > When this (useless) topology executes, even in local mode, even the very > first time, the network traffic always grows a lot: I see (in my Activity > Monitor) > > - an average of 432 KB of data received/sec > - moreless the same average of data sent/sec > - After a couple of hours the topology is running (idle) data received > is 1.26GB and data sent is 1GB > > (Important: Kafka - 0.10 - is not running in cluster, it is a single > instance that runs in the same machine with a single topic and a single > partition. I just downloaded Kafka on my machine, started it and created a > simple topic. When I put a message in the topic, everything in the topology > is working without any problem at all) > > Obviously, the reason is in the KafkaSpout.nextTuple() method (below), but > I don't understand why, without any message in Kafka, I should have such > traffic. Is there something I didn't consider? Is that the expected > behaviour? I had a look at Kafka logs, ZK logs, Storm logs, nothing, I have > cleaned up Kafka and ZK data, nothing, still the same behaviour. > > @Override > public void nextTuple() { > List<PartitionManager> managers = _coordinator. > getMyManagedPartitions(); > for (int i = 0; i < managers.size(); i++) { > > try { > // in case the number of managers decreased > _currPartitionIndex = _currPartitionIndex % managers.size(); > EmitState state = managers.get(_currPartitionIndex).next(_ > collector); > if (state != EmitState.EMITTED_MORE_LEFT) { > _currPartitionIndex = (_currPartitionIndex + 1) % > managers.size(); > } > if (state != EmitState.NO_EMITTED) { > break; > } > } catch (FailedFetchException e) { > LOG.warn("Fetch failed", e); > _coordinator.refresh(); > } > } > > long diffWithNow = System.currentTimeMillis() - _lastUpdateMs; > > /* > As far as the System.currentTimeMillis() is dependent on System > clock, > additional check on negative value of diffWithNow in case of > external changes. > */ > if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < > 0) { > commit(); > } > } > > > Many thanks in advance > Andrea > -- Regards, Abhishek Agarwal
