Hi Magdy,

The errors are not the same. The error you posted is due to
https://issues.apache.org/jira/browse/STORM-3013. There's a PR for master
up, and a 1.x version at https://github.com/srdo/storm/tree/STORM-3013-1.x
if you'd like to try it out.

2018-05-01 14:17 GMT+02:00 Magdy Halim (BLOOMBERG/ 731 LEX) <
[email protected]>:

> thank you for looking into this, yes we believe we are using 1.1.0 not
> 0.10.0.0, here is the same error happening in cluster-mode:
>
> NOTE: please see log snippet below for version and also the
> acquireAndEnsureOpen(), which I believe was recently introduced with the
> fix, at least that is what I think.
>
> ...
> 2018-04-30 16:21:29.406 INFO AppInfoParser 
> [Thread-128-customKafkaSpout-executor[30
> 30]] Kafka version : 1.1.0
> 2018-04-30 16:21:29.406 INFO AppInfoParser 
> [Thread-128-customKafkaSpout-executor[30
> 30]] Kafka commitId : fdcf75ea326b8e07
> 2018-04-30 16:21:29.407 INFO AppInfoParser 
> [Thread-186-customKafkaSpout-executor[29
> 29]] Kafka version : 1.1.0
> 2018-04-30 16:21:29.407 INFO AppInfoParser 
> [Thread-186-customKafkaSpout-executor[29
> 29]] Kafka commitId : fdcf75ea326b8e07
> ...
> 2018-04-30 16:39:25.885 INFO CustomKafkaSpout 
> [Thread-140-customKafkaSpout-executor[31
> 31]] CustomKafkaSpout is deactivated, there are 0 remaining instances.
> 2018-04-30 16:39:29.193 ERROR util [Thread-140-customKafkaSpout-executor[31
> 31]] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This
> consumer has already been closed.
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]
> Caused by: java.lang.IllegalStateException: This consumer has already
> been closed.
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1811)
> ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1641)
> ~[stormjar.jar:?]
> at org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.
> getValueAndReset(KafkaOffsetMetric.java:79) ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345)
> ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679)
> ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170)
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19)
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101)
> ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13)
> ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349)
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522)
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471)
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
> ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
> ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
>
>
>
>
>
>
>
>
>
>
>
>
> From: [email protected] At: 05/01/18 02:32:00
> To: [email protected]
> Subject: Re: Issues killing KafkaSpout with storm-kafka-client 1.1.2 and
> 1.2.1
>
> Hi Mitchell.
>
> Could you verify that you're using kafka-clients 1.1.0 and not 0.10.0.0?
> The stack trace you posted points to KafkaConsumer.java:1360 as the
> location of the close method, which is where it's declared in 0.10.0.0 and
> not 1.1.0.
>
> 2018-04-30 23:51 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) <
> [email protected]>:
>
>> Upon shutdown in local mode and sometimes cluster mode (when close is
>> called on a KafkaSpout), we get the following:
>>
>> ERROR Slot [SLOT_1024] - Error when processing event
>> java.lang.IllegalStateException: This consumer has already been closed.
>> at org.apache.kafka.clients.consumer.KafkaConsumer.
>> ensureNotClosed(KafkaConsumer.java:1416)
>> ~[Engine-0.0.1-SNAPSHOT.jar:?]
>> at org.apache.kafka.clients.consumer.KafkaConsumer.
>> acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?]
>> at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> ~[Engine-0.0.1-SNAPSHOT.jar:?]
>> at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485)
>> ~[Engine-0.0.1-SNAPSHOT.jar:?]
>> at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472)
>> ~[Engine-0.0.1-SNAPSHOT.jar:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_162]
>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162]
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_162]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
>> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
>> ~[clojure-1.7.0.jar:?]
>> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
>> ~[clojure-1.7.0.jar:?]
>> at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
>> at org.apache.storm.daemon.executor$mk_executor$reify__
>> 4901.shutdown(executor.clj:425)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_162]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
>> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
>> ~[clojure-1.7.0.jar:?]
>> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
>> ~[clojure-1.7.0.jar:?]
>> at org.apache.storm.daemon.wor ... truncated
>>
>>
>> I had a similar issue to this here: https://mail-archives.apache.
>> org/mod_mbox/storm-user/201803.mbox/browser. The issue was fixed
>> initially by updating the maven kafka-clients version from 0.10.0.0 to to
>> 1.1.0. However, updating storm-core and storm-kafka-client from version
>> 1.1.1 to 1.1.2, 1.2.0, 1.2.1, etc. causes the error to start occurring
>> again. I am confused why this would happen since the initial fix involved
>> updating kafka-clients, not storm-kafka-client. Looking at the release page
>> for 1.1.2 (http://storm.apache.org/2018/02/15/storm112-released.html),
>> it seems like there was a lot of Kafka integration changes involved in the
>> newer version. Has anyone seen something similar happening with one of
>> these newer versions?
>>
>> We override the deactivate method for KafkaSpout and call deactive using
>> super.deactivate(). If we remove this call to deactivate the error goes
>> away, but I feel that this is not a good solution for a couple of reasons.
>> It is coupled to the implementation of KafkaSpout (deactivate and close do
>> the same thing), and close isn't even guaranteed to be called in cluster
>> mode, which is what we really care about. So please advise on how to fix
>> this issue/if there is a workaround for the time being.
>>
>
>
>

Reply via email to