thank you Stig, we will keep an eye on the fix and try/apply it as soon as released.
From: [email protected] At: 05/01/18 08:41:51To: [email protected] Subject: Re: Issues killing KafkaSpout with storm-kafka-client 1.1.2 and 1.2.1 Hi Magdy, The errors are not the same. The error you posted is due to https://issues.apache.org/jira/browse/STORM-3013. There's a PR for master up, and a 1.x version at https://github.com/srdo/storm/tree/STORM-3013-1.x if you'd like to try it out. 2018-05-01 14:17 GMT+02:00 Magdy Halim (BLOOMBERG/ 731 LEX) <[email protected]>: thank you for looking into this, yes we believe we are using 1.1.0 not 0.10.0.0, here is the same error happening in cluster-mode: NOTE: please see log snippet below for version and also the acquireAndEnsureOpen(), which I believe was recently introduced with the fix, at least that is what I think. ... 2018-04-30 16:21:29.406 INFO AppInfoParser [Thread-128-customKafkaSpout-executor[30 30]] Kafka version : 1.1.0 2018-04-30 16:21:29.406 INFO AppInfoParser [Thread-128-customKafkaSpout-executor[30 30]] Kafka commitId : fdcf75ea326b8e07 2018-04-30 16:21:29.407 INFO AppInfoParser [Thread-186-customKafkaSpout-executor[29 29]] Kafka version : 1.1.0 2018-04-30 16:21:29.407 INFO AppInfoParser [Thread-186-customKafkaSpout-executor[29 29]] Kafka commitId : fdcf75ea326b8e07 ... 2018-04-30 16:39:25.885 INFO CustomKafkaSpout [Thread-140-customKafkaSpout-executor[31 31]] CustomKafkaSpout is deactivated, there are 0 remaining instances. 2018-04-30 16:39:29.193 ERROR util [Thread-140-customKafkaSpout-executor[31 31]] Async loop died! java.lang.RuntimeException: java.lang.IllegalStateException: This consumer has already been closed. at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162] Caused by: java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1811) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1641) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) ~[storm-core-1.2.1.jar:1.2.1] at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) ~[clojure-1.7.0.jar:?] at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 7 more From: [email protected] At: 05/01/18 02:32:00To: [email protected] Subject: Re: Issues killing KafkaSpout with storm-kafka-client 1.1.2 and 1.2.1 Hi Mitchell. Could you verify that you're using kafka-clients 1.1.0 and not 0.10.0.0? The stack trace you posted points to KafkaConsumer.java:1360 as the location of the close method, which is where it's declared in 0.10.0.0 and not 1.1.0. 2018-04-30 23:51 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) <[email protected]>: Upon shutdown in local mode and sometimes cluster mode (when close is called on a KafkaSpout), we get the following: ERROR Slot [SLOT_1024] - Error when processing event java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1416) ~[Engine-0.0.1-SNAPSHOT.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) ~[Engine-0.0.1-SNAPSHOT.jar:?] at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) ~[Engine-0.0.1-SNAPSHOT.jar:?] at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) ~[Engine-0.0.1-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_162] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?] at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?] at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) ~[storm-core-1.1.1.jar:1.1.1] at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?] at org.apache.storm.daemon.executor$mk_executor$reify__4901.shutdown(executor.clj:425) ~[storm-core-1.1.1.jar:1.1.1] at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?] at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?] at org.apache.storm.daemon.wor ... truncated I had a similar issue to this here: https://mail-archives.apache.org/mod_mbox/storm-user/201803.mbox/browser. The issue was fixed initially by updating the maven kafka-clients version from 0.10.0.0 to to 1.1.0. However, updating storm-core and storm-kafka-client from version 1.1.1 to 1.1.2, 1.2.0, 1.2.1, etc. causes the error to start occurring again. I am confused why this would happen since the initial fix involved updating kafka-clients, not storm-kafka-client. Looking at the release page for 1.1.2 (http://storm.apache.org/2018/02/15/storm112-released.html), it seems like there was a lot of Kafka integration changes involved in the newer version. Has anyone seen something similar happening with one of these newer versions? We override the deactivate method for KafkaSpout and call deactive using super.deactivate(). If we remove this call to deactivate the error goes away, but I feel that this is not a good solution for a couple of reasons. It is coupled to the implementation of KafkaSpout (deactivate and close do the same thing), and close isn't even guaranteed to be called in cluster mode, which is what we really care about. So please advise on how to fix this issue/if there is a workaround for the time being.
