Hi Magdy, The errors are not the same. The error you posted is due to https://issues.apache.org/jira/browse/STORM-3013. There's a PR for master up, and a 1.x version at https://github.com/srdo/storm/tree/STORM-3013-1.x if you'd like to try it out.
2018-05-01 14:17 GMT+02:00 Magdy Halim (BLOOMBERG/ 731 LEX) < [email protected]>: > thank you for looking into this, yes we believe we are using 1.1.0 not > 0.10.0.0, here is the same error happening in cluster-mode: > > NOTE: please see log snippet below for version and also the > acquireAndEnsureOpen(), which I believe was recently introduced with the > fix, at least that is what I think. > > ... > 2018-04-30 16:21:29.406 INFO AppInfoParser > [Thread-128-customKafkaSpout-executor[30 > 30]] Kafka version : 1.1.0 > 2018-04-30 16:21:29.406 INFO AppInfoParser > [Thread-128-customKafkaSpout-executor[30 > 30]] Kafka commitId : fdcf75ea326b8e07 > 2018-04-30 16:21:29.407 INFO AppInfoParser > [Thread-186-customKafkaSpout-executor[29 > 29]] Kafka version : 1.1.0 > 2018-04-30 16:21:29.407 INFO AppInfoParser > [Thread-186-customKafkaSpout-executor[29 > 29]] Kafka commitId : fdcf75ea326b8e07 > ... > 2018-04-30 16:39:25.885 INFO CustomKafkaSpout > [Thread-140-customKafkaSpout-executor[31 > 31]] CustomKafkaSpout is deactivated, there are 0 remaining instances. > 2018-04-30 16:39:29.193 ERROR util [Thread-140-customKafkaSpout-executor[31 > 31]] Async loop died! > java.lang.RuntimeException: java.lang.IllegalStateException: This > consumer has already been closed. > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162] > Caused by: java.lang.IllegalStateException: This consumer has already > been closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1811) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1641) > ~[stormjar.jar:?] > at org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric. > getValueAndReset(KafkaOffsetMetric.java:79) ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) > ~[storm-core-1.2.1.jar:1.2.1] > at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] > at clojure.core$filter$fn__4580.invoke(core.clj:2679) > ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] > at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 7 more > > > > > > > > > > > > > From: [email protected] At: 05/01/18 02:32:00 > To: [email protected] > Subject: Re: Issues killing KafkaSpout with storm-kafka-client 1.1.2 and > 1.2.1 > > Hi Mitchell. > > Could you verify that you're using kafka-clients 1.1.0 and not 0.10.0.0? > The stack trace you posted points to KafkaConsumer.java:1360 as the > location of the close method, which is where it's declared in 0.10.0.0 and > not 1.1.0. > > 2018-04-30 23:51 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) < > [email protected]>: > >> Upon shutdown in local mode and sometimes cluster mode (when close is >> called on a KafkaSpout), we get the following: >> >> ERROR Slot [SLOT_1024] - Error when processing event >> java.lang.IllegalStateException: This consumer has already been closed. >> at org.apache.kafka.clients.consumer.KafkaConsumer. >> ensureNotClosed(KafkaConsumer.java:1416) >> ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at org.apache.kafka.clients.consumer.KafkaConsumer. >> acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) >> ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) >> ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) >> ~[Engine-0.0.1-SNAPSHOT.jar:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_162] >> at sun.reflect.NativeMethodAccessorImpl.invoke( >> NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162] >> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_162] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] >> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) >> ~[clojure-1.7.0.jar:?] >> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) >> ~[clojure-1.7.0.jar:?] >> at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) >> ~[storm-core-1.1.1.jar:1.1.1] >> at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?] >> at org.apache.storm.daemon.executor$mk_executor$reify__ >> 4901.shutdown(executor.clj:425) >> ~[storm-core-1.1.1.jar:1.1.1] >> at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?] >> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_162] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162] >> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) >> ~[clojure-1.7.0.jar:?] >> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) >> ~[clojure-1.7.0.jar:?] >> at org.apache.storm.daemon.wor ... truncated >> >> >> I had a similar issue to this here: https://mail-archives.apache. >> org/mod_mbox/storm-user/201803.mbox/browser. The issue was fixed >> initially by updating the maven kafka-clients version from 0.10.0.0 to to >> 1.1.0. However, updating storm-core and storm-kafka-client from version >> 1.1.1 to 1.1.2, 1.2.0, 1.2.1, etc. causes the error to start occurring >> again. I am confused why this would happen since the initial fix involved >> updating kafka-clients, not storm-kafka-client. Looking at the release page >> for 1.1.2 (http://storm.apache.org/2018/02/15/storm112-released.html), >> it seems like there was a lot of Kafka integration changes involved in the >> newer version. Has anyone seen something similar happening with one of >> these newer versions? >> >> We override the deactivate method for KafkaSpout and call deactive using >> super.deactivate(). If we remove this call to deactivate the error goes >> away, but I feel that this is not a good solution for a couple of reasons. >> It is coupled to the implementation of KafkaSpout (deactivate and close do >> the same thing), and close isn't even guaranteed to be called in cluster >> mode, which is what we really care about. So please advise on how to fix >> this issue/if there is a workaround for the time being. >> > > >
