You may want to check out
https://issues.apache.org/jira/browse/STORM-1674

It is fixed in 1.0.2

On Fri, Aug 19, 2016 at 11:35 AM, Andrea Gazzarini <[email protected]> wrote:

> Hi guys,
> retrying here after no luck in SO (I'm not sure if this is a question
> about Storm or Kafka, or both).
> However:
>
> After developing and executing my Storm (1.0.1) topology with a KafkaSpout
> and a couple of Bolts, I noticed a huge network traffic even when the
> topology is idle (no message on Kafka, no processing is done in bolts). So
> I started to comment out my topology piece by piece in order to find the
> cause and I ended with only the KafkaSpout in my main:
> ....
> final SpoutConfig spoutConfig = new SpoutConfig(
>                 new ZkHosts(zkHosts, "/brokers"),
>                 "files-topic", // topic
>                 "/kafka", // ZK chroot
>                 "consumer-group-name");
>      spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>      spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
>      topologyBuilder.setSpout(
>                         "kafka-spout-id,
>                         new KafkaSpout(config),
>                         1);
> ....
> When this (useless) topology executes, even in local mode, even the very
> first time, the network traffic always grows a lot: I see (in my Activity
> Monitor)
>
>    - an average of 432 KB of data received/sec
>    - moreless the same average of data sent/sec
>    - After a couple of hours the topology is running (idle) data received
>    is 1.26GB and data sent is 1GB
>
> (Important: Kafka - 0.10 - is not running in cluster, it is a single
> instance that runs in the same machine with a single topic and a single
> partition. I just downloaded Kafka on my machine, started it and created a
> simple topic. When I put a message in the topic, everything in the topology
> is working without any problem at all)
>
> Obviously, the reason is in the KafkaSpout.nextTuple() method (below), but
> I don't understand why, without any message in Kafka, I should have such
> traffic. Is there something I didn't consider? Is that the expected
> behaviour? I had a look at Kafka logs, ZK logs, Storm logs, nothing, I have
> cleaned up Kafka and ZK data, nothing, still the same behaviour.
>
> @Override
> public void nextTuple() {
>     List<PartitionManager> managers = _coordinator.
> getMyManagedPartitions();
>     for (int i = 0; i < managers.size(); i++) {
>
>         try {
>             // in case the number of managers decreased
>             _currPartitionIndex = _currPartitionIndex % managers.size();
>             EmitState state = managers.get(_currPartitionIndex).next(_
> collector);
>             if (state != EmitState.EMITTED_MORE_LEFT) {
>                 _currPartitionIndex = (_currPartitionIndex + 1) %
> managers.size();
>             }
>             if (state != EmitState.NO_EMITTED) {
>                 break;
>             }
>         } catch (FailedFetchException e) {
>             LOG.warn("Fetch failed", e);
>             _coordinator.refresh();
>         }
>     }
>
>     long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
>
>     /*
>          As far as the System.currentTimeMillis() is dependent on System
> clock,
>          additional check on negative value of diffWithNow in case of
> external changes.
>      */
>     if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow <
> 0) {
>         commit();
>     }
> }
>
>
> Many thanks in advance
> Andrea
>



-- 
Regards,
Abhishek Agarwal

Reply via email to