We are using the KafkaSpout class provided by version 1.1.1 of 
storm-kafka-client, along with version 1.1.1 of Storm and version 0.10.0.0 of 
kafka-clients. In local mode, we start our topology using LocalCluster's 
submitTopology method, and bring down the topology by calling the killTopology 
method followed by the shutdown method. Every time killTopology is run, the 
following occurs:

ERROR Slot [SLOT_1024] - Error when processing event
java.lang.IllegalStateException: This consumer has already been closed.
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1416)
 ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1427)
 ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
    at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
     at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_162]
       at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_162]
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) 
~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) 
~[storm-core-1.1.1.jar:1.1.1]
 at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
     at 
org.apache.storm.daemon.executor$mk_executor$reify__4901.shutdown(executor.clj:425)
 ~[storm-core-1.1.1.jar:1.1.1]
        at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) 
~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.wor ... truncated

I did notice that updating just the version of kafka-clients to 1.0.1 made this 
issue disappear. Also, this issue only happens in local mode, not cluster mode. 
Is there something wrong with how we are bringing down the topology using 
LocalCluster? Or is this a known issue with version 1.1.1 of storm-kafka-client?


Reply via email to