Okay, that makes sense. The error is slightly different, but this should also be fixed by https://issues.apache.org/jira/browse/STORM-3013. The bug is that both KafkaSpout.deactivate and KafkaSpout.close try to commit any committable offsets, and close the consumer. When a topology is deactivated, the consumer commits any pending offsets, and shuts down. When close is called right after, the consumer is already closed and you get the stack trace you posted. The proposed fix for STORM-3013 is to only close the consumer when the spout is closed, not when it is deactivated. This also allows us to keep providing metrics for the spout, even if the topology is deactivated, which can make sense if you're looking for metrics like e.g. how far behind the head of the log your spout is.
2018-05-01 19:57 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) < [email protected]>: > Sorry, I think I posted an older stack trace. Here is the more recent > error using storm-core and storm-kafka-clients 1.2.1 with kafka-clients > 1.1.1: > > 2018-05-01 13:45:47.072 ERROR Slot [SLOT_1027] Error when processing event > java.lang.IllegalStateException: This consumer has already been closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1811) > ~[Engine-0.0.1-SNAPSHOT.jar:?] > at org.apache.kafka.clients.consumer.KafkaConsumer. > assignment(KafkaConsumer.java:839) ~[Engine-0.0.1-SNAPSHOT.jar:?] > at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:681) > ~[Engine-0.0.1-SNAPSHOT.jar:?] > at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:672) > ~[Engine-0.0.1-SNAPSHOT.jar:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_162] > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162] > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] > at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) > ~[clojure-1.7.0.jar:?] > at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) > ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$fn__5117.invoke(executor.clj:868) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$mk_executor$reify__ > 4914.shutdown(executor.clj:432) ~[storm-core-1.2.1.jar:1.2.1] > at sun.reflect.GeneratedMethodAccessor170.invoke(Unknown Source) ~[?:?] > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] > at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) > ~[clojure-1.7.0.jar:?] > at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) > ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.worker$fn__5545$exec_fn__1369_ > _auto__$reify__5547$shutdown_STAR___5567.invoke(worker.clj:684) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.daemon.worker$fn__5545$exec_fn__1369_ > _auto__$reify$reify__5593.shutdown(worker.clj:724) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:69) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:752) > [storm-core-1.2.1.jar:1.2.1] > 2018-05-01 13:45:47.084 ERROR Utils [SLOT_1027] Halting process: Error > when processing an event > java.lang.RuntimeException: Halting process: Error when processing an event > at org.apache.storm.utils.Utils.exitProcess(Utils.java:1777) > [storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:796) > [storm-core-1.2.1.jar:1.2.1] > > > From: [email protected] At: 05/01/18 02:32:00 > To: [email protected] > Subject: Re: Issues killing KafkaSpout with storm-kafka-client 1.1.2 and > 1.2.1 > > Hi Mitchell. > > Could you verify that you're using kafka-clients 1.1.0 and not 0.10.0.0? > The stack trace you posted points to KafkaConsumer.java:1360 as the > location of the close method, which is where it's declared in 0.10.0.0 and > not 1.1.0. > > 2018-04-30 23:51 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) < > [email protected]>: > >> Upon shutdown in local mode and sometimes cluster mode (when close is >> called on a KafkaSpout), we get the following: >> >> ERROR Slot [SLOT_1024] - Error when processing event >> java.lang.IllegalStateException: This consumer has already been closed. >> at org.apache.kafka.clients.consumer.KafkaConsumer. >> ensureNotClosed(KafkaConsumer.java:1416) >> ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at org.apache.kafka.clients.consumer.KafkaConsumer. >> acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) >> ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) >> ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) >> ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_162] >> at sun.reflect.NativeMethodAccessorImpl.invoke( >> NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162] >> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_162] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] >> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) >> ~[clojure-1.7.0.jar:?] >> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) >> ~[clojure-1.7.0.jar:?] >> at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) >> ~[storm-core-1.1.1.jar:1.1.1] >> at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?] >> at org.apache.storm.daemon.executor$mk_executor$reify__ >> 4901.shutdown(executor.clj:425) >> ~[storm-core-1.1.1.jar:1.1.1] >> at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?] >> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_162] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] >> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) >> ~[clojure-1.7.0.jar:?] >> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) >> ~[clojure-1.7.0.jar:?] >> at org.apache.storm.daemon.wor ... truncated >> >> >> I had a similar issue to this here: https://mail-archives.apache. >> org/mod_mbox/storm-user/201803.mbox/browser. The issue was fixed >> initially by updating the maven kafka-clients version from 0.10.0.0 to to >> 1.1.0. However, updating storm-core and storm-kafka-client from version >> 1.1.1 to 1.1.2, 1.2.0, 1.2.1, etc. causes the error to start occurring >> again. I am confused why this would happen since the initial fix involved >> updating kafka-clients, not storm-kafka-client. Looking at the release page >> for 1.1.2 (http://storm.apache.org/2018/02/15/storm112-released.html), >> it seems like there was a lot of Kafka integration changes involved in the >> newer version. Has anyone seen something similar happening with one of >> these newer versions? >> >> We override the deactivate method for KafkaSpout and call deactive using >> super.deactivate(). If we remove this call to deactivate the error goes >> away, but I feel that this is not a good solution for a couple of reasons. >> It is coupled to the implementation of KafkaSpout (deactivate and close do >> the same thing), and close isn't even guaranteed to be called in cluster >> mode, which is what we really care about. So please advise on how to fix >> this issue/if there is a workaround for the time being. >> > > >
