Hi Mitchell.

Could you verify that you're using kafka-clients 1.1.0 and not 0.10.0.0?
The stack trace you posted points to KafkaConsumer.java:1360 as the
location of the close method, which is where it's declared in 0.10.0.0 and
not 1.1.0.

2018-04-30 23:51 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) <
[email protected]>:

> Upon shutdown in local mode and sometimes cluster mode (when close is
> called on a KafkaSpout), we get the following:
>
> ERROR Slot [SLOT_1024] - Error when processing event
> java.lang.IllegalStateException: This consumer has already been closed.
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> ensureNotClosed(KafkaConsumer.java:1416)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_162]
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_162]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> ~[clojure-1.7.0.jar:?]
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855)
> ~[storm-core-1.1.1.jar:1.1.1]
> at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$mk_executor$reify__
> 4901.shutdown(executor.clj:425)
> ~[storm-core-1.1.1.jar:1.1.1]
> at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_162]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> ~[clojure-1.7.0.jar:?]
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.wor ... truncated
>
>
> I had a similar issue to this here: https://mail-archives.apache.
> org/mod_mbox/storm-user/201803.mbox/browser. The issue was fixed
> initially by updating the maven kafka-clients version from 0.10.0.0 to to
> 1.1.0. However, updating storm-core and storm-kafka-client from version
> 1.1.1 to 1.1.2, 1.2.0, 1.2.1, etc. causes the error to start occurring
> again. I am confused why this would happen since the initial fix involved
> updating kafka-clients, not storm-kafka-client. Looking at the release page
> for 1.1.2 (http://storm.apache.org/2018/02/15/storm112-released.html), it
> seems like there was a lot of Kafka integration changes involved in the
> newer version. Has anyone seen something similar happening with one of
> these newer versions?
>
> We override the deactivate method for KafkaSpout and call deactive using
> super.deactivate(). If we remove this call to deactivate the error goes
> away, but I feel that this is not a good solution for a couple of reasons.
> It is coupled to the implementation of KafkaSpout (deactivate and close do
> the same thing), and close isn't even guaranteed to be called in cluster
> mode, which is what we really care about. So please advise on how to fix
> this issue/if there is a workaround for the time being.
>

Reply via email to