Sorry, I think I posted an older stack trace. Here is the more recent error 
using storm-core and storm-kafka-clients 1.2.1 with kafka-clients 1.1.1:

2018-05-01 13:45:47.072 ERROR Slot [SLOT_1027] Error when processing event
java.lang.IllegalStateException: This consumer has already been closed.
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1811)
 ~[Engine-0.0.1-SNAPSHOT.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:839)
 ~[Engine-0.0.1-SNAPSHOT.jar:?]
        at 
org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:681) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:672) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_162]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_162]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_162]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.7.0.jar:?]
        at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) 
~[clojure-1.7.0.jar:?]
        at org.apache.storm.daemon.executor$fn__5117.invoke(executor.clj:868) 
~[storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
        at 
org.apache.storm.daemon.executor$mk_executor$reify__4914.shutdown(executor.clj:432)
 ~[storm-core-1.2.1.jar:1.2.1]
        at sun.reflect.GeneratedMethodAccessor170.invoke(Unknown Source) ~[?:?]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_162]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.7.0.jar:?]
        at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) 
~[clojure-1.7.0.jar:?]
        at 
org.apache.storm.daemon.worker$fn__5545$exec_fn__1369__auto__$reify__5547$shutdown_STAR___5567.invoke(worker.clj:684)
 ~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.daemon.worker$fn__5545$exec_fn__1369__auto__$reify$reify__5593.shutdown(worker.clj:724)
 ~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67) 
~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:69) 
~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311)
 ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527) 
~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265) 
~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:752) 
[storm-core-1.2.1.jar:1.2.1]
2018-05-01 13:45:47.084 ERROR Utils [SLOT_1027] Halting process: Error when 
processing an event
java.lang.RuntimeException: Halting process: Error when processing an event
        at org.apache.storm.utils.Utils.exitProcess(Utils.java:1777) 
[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:796) 
[storm-core-1.2.1.jar:1.2.1]


From: [email protected] At: 05/01/18 02:32:00To:  [email protected]
Subject: Re: Issues killing KafkaSpout with storm-kafka-client 1.1.2 and 1.2.1

Hi Mitchell.

Could you verify that you're using kafka-clients 1.1.0 and not 0.10.0.0? The 
stack trace you posted points to KafkaConsumer.java:1360 as the location of the 
close method, which is where it's declared in 0.10.0.0 and not 1.1.0. 

2018-04-30 23:51 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) 
<[email protected]>:

Upon shutdown in local mode and sometimes cluster mode (when close is called on 
a KafkaSpout), we get the following: 

ERROR Slot [SLOT_1024] - Error when processing event
java.lang.IllegalStateException: This consumer has already been closed.
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1416)
~[Engine-0.0.1-SNAPSHOT.jar:?]
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1427)
 ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
    at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
     at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_162]
       at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_162]
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) 
~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) 
~[storm-core-1.1.1.jar:1.1.1]
 at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
     at 
org.apache.storm.daemon.executor$mk_executor$reify__4901.shutdown(executor.clj:425)
~[storm-core-1.1.1.jar:1.1.1]
        at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) 
~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.wor ... truncated


I had a similar issue to this here: 
https://mail-archives.apache.org/mod_mbox/storm-user/201803.mbox/browser. The 
issue was fixed initially by updating the maven kafka-clients version from 
0.10.0.0 to to 1.1.0. However, updating storm-core and storm-kafka-client from 
version 1.1.1 to 1.1.2, 1.2.0, 1.2.1, etc. causes the error to start occurring 
again. I am confused why this would happen since the initial fix involved 
updating kafka-clients, not storm-kafka-client. Looking at the release page for 
1.1.2 (http://storm.apache.org/2018/02/15/storm112-released.html), it seems 
like there was a lot of Kafka integration changes involved in the newer 
version. Has anyone seen something similar happening with one of these newer 
versions?

We override the deactivate method for KafkaSpout and call deactive using 
super.deactivate(). If we remove this call to deactivate the error goes away, 
but I feel that this is not a good solution for a couple of reasons. It is 
coupled to the implementation of KafkaSpout (deactivate and close do the same 
thing), and close isn't even guaranteed to be called in cluster mode, which is 
what we really care about. So please advise on how to fix this issue/if there 
is a workaround for the time being.


Reply via email to