Hi Mitchell. Could you verify that you're using kafka-clients 1.1.0 and not 0.10.0.0? The stack trace you posted points to KafkaConsumer.java:1360 as the location of the close method, which is where it's declared in 0.10.0.0 and not 1.1.0.
2018-04-30 23:51 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) < [email protected]>: > Upon shutdown in local mode and sometimes cluster mode (when close is > called on a KafkaSpout), we get the following: > > ERROR Slot [SLOT_1024] - Error when processing event > java.lang.IllegalStateException: This consumer has already been closed. > at org.apache.kafka.clients.consumer.KafkaConsumer. > ensureNotClosed(KafkaConsumer.java:1416) > ~[Engine-0.0.1-SNAPSHOT.jar:?] > at org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) > ~[Engine-0.0.1-SNAPSHOT.jar:?] > at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) > ~[Engine-0.0.1-SNAPSHOT.jar:?] > at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) > ~[Engine-0.0.1-SNAPSHOT.jar:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_162] > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162] > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_162] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] > at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) > ~[clojure-1.7.0.jar:?] > at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) > ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) > ~[storm-core-1.1.1.jar:1.1.1] > at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$mk_executor$reify__ > 4901.shutdown(executor.clj:425) > ~[storm-core-1.1.1.jar:1.1.1] > at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?] > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_162] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] > at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) > ~[clojure-1.7.0.jar:?] > at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) > ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.wor ... truncated > > > I had a similar issue to this here: https://mail-archives.apache. > org/mod_mbox/storm-user/201803.mbox/browser. The issue was fixed > initially by updating the maven kafka-clients version from 0.10.0.0 to to > 1.1.0. However, updating storm-core and storm-kafka-client from version > 1.1.1 to 1.1.2, 1.2.0, 1.2.1, etc. causes the error to start occurring > again. I am confused why this would happen since the initial fix involved > updating kafka-clients, not storm-kafka-client. Looking at the release page > for 1.1.2 (http://storm.apache.org/2018/02/15/storm112-released.html), it > seems like there was a lot of Kafka integration changes involved in the > newer version. Has anyone seen something similar happening with one of > these newer versions? > > We override the deactivate method for KafkaSpout and call deactive using > super.deactivate(). If we remove this call to deactivate the error goes > away, but I feel that this is not a good solution for a couple of reasons. > It is coupled to the implementation of KafkaSpout (deactivate and close do > the same thing), and close isn't even guaranteed to be called in cluster > mode, which is what we really care about. So please advise on how to fix > this issue/if there is a workaround for the time being. >
