I noticed that storm kafka spout reconsume all kafka message after a
rolling restart of kafka cluster.

This issue occured only with kafkaSpout consumer and not for my other
consumers (ruby based  using the kafka consumer API like kafkaSpout)

Attached logs of the spout.

Do you know what can cause this kind of behavior ?
2017-08-20 12:03:22.867 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina
tor kafka1:9092 (id: 2147483646 rack: null) dead for group storm_kafka_topology
2017-08-20 12:03:23.052 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat
or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology.
2017-08-20 12:03:23.667 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina
tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology
2017-08-20 12:03:24.092 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat
or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology.
2017-08-20 12:03:24.094 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina
tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology
2017-08-20 12:03:24.195 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat
or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology.
2017-08-20 12:03:24.197 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina
tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology
2017-08-20 12:03:24.299 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat
or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology.
2017-08-20 12:03:24.304 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina
tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology
2017-08-20 12:03:24.408 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat
or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology.
2017-08-20 12:03:24.409 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina
tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology
2017-08-20 12:03:24.511 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat
or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology.
2017-08-20 12:03:24.512 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina
tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology
2017-08-20 12:03:24.643 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat
or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology.
2017-08-20 12:03:24.648 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina
tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology
2017-08-20 12:03:24.781 o.a.k.c.c.i.AbstractCoordinator 
Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat
or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology.
2017-08-20 12:03:24.787 o.a.s.util Thread-11-kafkaSpout-executor[4 4] [ERROR] 
Async loop died!
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebala
nced and assigned the partitions to another member. This means that the time 
between subsequent calls to poll() was lo
nger than the configured session.timeout.ms, which typically implies that the 
poll loop is spending too much time mess
age processing. You can address this either by increasing the session timeout 
or by reducing the maximum size of batch
es returned in poll() with max.poll.records.
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(Consumer
Coordinator.java:578) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(Consumer
Coordinator.java:519) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(Abstra
ctCoordinator.java:679) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(Abstra
ctCoordinator.java:658) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
 ~[stormjar.ja
r:?]
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 ~[stormjar.ja
r:?]
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 ~[stormjar.jar:?
]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete
(ConsumerNetworkClient.java:426) ~[stormjar.jar:?]
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360
) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 ~[st
ormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
 ~[st
ormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
 ~[st
ormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:
404) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
 ~[stormjar.jar:?]
  at 
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
 ~[stormjar.jar:?]
  at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) 
~[stormjar.jar:?]
  at 
org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644)
 ~[storm-core-1.1.0.jar
:1.1.0]
  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.1.0.jar:1.1.0]
  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
  at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
2017-08-20 12:03:24.791 o.a.s.d.executor Thread-11-kafkaSpout-executor[4 4] 
[ERROR]
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebala
nced and assigned the partitions to another member. This means that the time 
between subsequent calls to poll() was lo
nger than the configured session.timeout.ms, which typically implies that the 
poll loop is spending too much time mess
age processing. You can address this either by increasing the session timeout 
or by reducing the maximum size of batch
es returned in poll() with max.poll.records.
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(Consumer
Coordinator.java:578) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(Consumer
Coordinator.java:519) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(Abstra
ctCoordinator.java:679) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(Abstra
ctCoordinator.java:658) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
 ~[stormjar.ja
r:?]
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 ~[stormjar.ja
r:?]
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 ~[stormjar.jar:?
]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete
(ConsumerNetworkClient.java:426) ~[stormjar.jar:?]
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360
) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 ~[st
ormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
 ~[st
ormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
 ~[st
ormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:
404) ~[stormjar.jar:?]
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
 ~[stormjar.jar:?]
  at 
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
 ~[stormjar.jar:?]
  at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) 
~[stormjar.jar:?]
  at 
org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644)
 ~[storm-core-1.1.0.jar
:1.1.0]
  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.1.0.jar:1.1.0]
  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
  at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
2017-08-20 12:03:24.809 o.a.s.util Thread-11-kafkaSpout-executor[4 4] [ERROR] 
Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
[storm-core-1.1.0.jar:1.1.0]
  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
  at org.apache.storm.daemon.worker$fn__5646$fn__5647.invoke(worker.clj:763) 
[storm-core-1.1.0.jar:1.1.0]
  at 
org.apache.storm.daemon.executor$mk_executor_data$fn__4863$fn__4864.invoke(executor.clj:274)
 [storm-core-1.
1.0.jar:1.1.0]
  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494) 
[storm-core-1.1.0.jar:1.1.0]
  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
  at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
2017-08-20 12:03:24.811 o.a.s.d.worker Thread-14 [INFO] Shutting down worker 
storm-kafka-topology-113-1502717137 8c6fca83-6c5a-4e89-af25-9071700b45ae 6707
2017-08-20 12:03:24.811 o.a.s.m.n.Client Thread-14 [INFO] closing Netty Client 
Netty-Client-storm-cluster-aefb/172.16.
247.10:6708
2017-08-20 12:03:24.811 o.a.s.m.n.Client Thread-14 [INFO] waiting up to 600000 
ms to send 0 pending messages to Netty-
Client-storm-cluster-aefb/172.16.247.10:6708
2017-08-20 12:03:24.812 o.a.s.m.n.Client Thread-14 [INFO] closing Netty Client 
Netty-Client-storm-cluster-12e9/172.16.
249.242:6709
2017-08-20 12:03:24.812 o.a.s.m.n.Client Thread-14 [INFO] waiting up to 600000 
ms to send 0 pending messages to Netty-
Client-storm-cluster-12e9/172.16.249.242:6709
2017-08-20 12:03:24.812 o.a.s.d.worker Thread-14 [INFO] Terminating messaging 
context
2017-08-20 12:03:24.812 o.a.s.d.worker Thread-14 [INFO] Shutting down executors
2017-08-20 12:03:24.812 o.a.s.d.executor Thread-14 [INFO] Shutting down 
executor s3UploadBolt:[7 7]
2017-08-20 12:03:24.813 o.a.s.util Thread-5-s3UploadBolt-executor[7 7] [INFO] 
Async loop interrupted!
2017-08-20 12:03:24.813 o.a.s.util Thread-4-disruptor-executor[7 7]-send-queue 
[INFO] Async loop interrupted!

Reply via email to