We are using the KafkaSpout class provided by version 1.1.1 of
storm-kafka-client, along with version 1.1.1 of Storm and version 0.10.0.0 of
kafka-clients. In local mode, we start our topology using LocalCluster's
submitTopology method, and bring down the topology by calling the killTopology
method followed by the shutdown method. Every time killTopology is run, the
following occurs:
ERROR Slot [SLOT_1024] - Error when processing event
java.lang.IllegalStateException: This consumer has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1416)
~[Engine-0.0.1-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1427)
~[Engine-0.0.1-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
~[Engine-0.0.1-SNAPSHOT.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485)
~[Engine-0.0.1-SNAPSHOT.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472)
~[Engine-0.0.1-SNAPSHOT.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_162]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_162]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_162]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
~[clojure-1.7.0.jar:?]
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855)
~[storm-core-1.1.1.jar:1.1.1]
at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
at
org.apache.storm.daemon.executor$mk_executor$reify__4901.shutdown(executor.clj:425)
~[storm-core-1.1.1.jar:1.1.1]
at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_162]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
~[clojure-1.7.0.jar:?]
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.wor ... truncated
I did notice that updating just the version of kafka-clients to 1.0.1 made this
issue disappear. Also, this issue only happens in local mode, not cluster mode.
Is there something wrong with how we are bringing down the topology using
LocalCluster? Or is this a known issue with version 1.1.1 of storm-kafka-client?