We can just update kafka-clients to 1.0.0. Thank you for the explanation.

From: [email protected] At: 03/28/18 16:01:09To:  [email protected]
Subject: Re: Issue killing KafkaSpout with storm-kafka-client 1.1.1

Looks like a bug in the consumer 
https://github.com/apache/kafka/commit/031da889bc811200da67568c5779760dcb006238.
 The spout closes the consumer both when the topology is deactivated, and when 
the spout is closed. For consumers in pre-1.0.0 versions the consumer close 
method happened to not be idempotent. I believe both deactivate and close are 
called when a local cluster is shut down. When a regular worker is shut down, I 
think the JVM running it is just killed, so close doesn't get called on the 
spout.

If you need this fixed for older consumer versions, we could null the consumer 
reference after closing it, so we don't close it more than once.

2018-03-28 19:45 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) 
<[email protected]>:

We are using the KafkaSpout class provided by version 1.1.1 of 
storm-kafka-client, along with version 1.1.1 of Storm and version 0.10.0.0 of 
kafka-clients. In local mode, we start our topology using LocalCluster's 
submitTopology method, and bring down the topology by calling the killTopology 
method followed by the shutdown method. Every time killTopology is run, the 
following occurs:

ERROR Slot [SLOT_1024] - Error when processing event
java.lang.IllegalStateException: This consumer has already been closed.
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1416)
 ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1427)
 ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
    at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
     at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) 
~[Engine-0.0.1-SNAPSHOT.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_162]
       at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_162]
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) 
~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) 
~[storm-core-1.1.1.jar:1.1.1]
 at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
     at 
org.apache.storm.daemon.executor$mk_executor$reify__4901.shutdown(executor.clj:425)
 ~[storm-core-1.1.1.jar:1.1.1]
        at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) 
~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.wor ... truncated

I did notice that updating just the version of kafka-clients to 1.0.1 made this 
issue disappear. Also, this issue only happens in local mode, not cluster mode. 
Is there something wrong with how we are bringing down the topology using 
LocalCluster? Or is this a known issue with version 1.1.1 of storm-kafka-client?


Reply via email to