[ 
https://issues.apache.org/jira/browse/STORM-2494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hugo Louro resolved STORM-2494.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 1.2.0

STORM-2640 also fixes STORM-2494

> KafkaSpout does not handle CommitFailedException
> ------------------------------------------------
>
>                 Key: STORM-2494
>                 URL: https://issues.apache.org/jira/browse/STORM-2494
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0
>            Reporter: Yuri Barseghyan
>            Assignee: Hugo Louro
>             Fix For: 1.2.0
>
>
> In situations when tuple processing takes longer than session timeout, we get 
> CommitFailedException and instead of recovering from it Storm worker dies.
> {code}
> 2017-04-26 11:07:04.902 o.a.s.util [ERROR] Async loop died!
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured session.timeout.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  ~[stormjar.jar:3.0.2]
> \tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
> ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
>  ~[stormjar.jar:3.0.2]
> \tat org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) 
> ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644)
>  ~[storm-core-1.1.0.jar:1.1.0]
> \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.1.0.jar:1.1.0]
> \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> 2017-04-26 11:07:04.909 o.a.s.d.executor [ERROR] 
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured session.timeout.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  ~[stormjar.jar:3.0.2]
> \tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
> ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
>  ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
>  ~[stormjar.jar:3.0.2]
> \tat org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) 
> ~[stormjar.jar:3.0.2]
> \tat 
> org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644)
>  ~[storm-core-1.1.0.jar:1.1.0]
> \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.1.0.jar:1.1.0]
> \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> 2017-04-26 11:07:04.953 o.a.s.util [ERROR] Halting process: (\"Worker died\")
> java.lang.RuntimeException: (\"Worker died\")
> \tat org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.jar:1.1.0]
> \tat clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
> \tat org.apache.storm.daemon.worker$fn__5646$fn__5647.invoke(worker.clj:763) 
> [storm-core-1.1.0.jar:1.1.0]
> \tat 
> org.apache.storm.daemon.executor$mk_executor_data$fn__4863$fn__4864.invoke(executor.clj:274)
>  [storm-core-1.1.0.jar:1.1.0]
> \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494) 
> [storm-core-1.1.0.jar:1.1.0]
> \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> 2017-04-26 11:07:44.507 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] 
> Instantiated KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, 
> timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, 
> maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, 
> timeUnit=SECONDS}}
> 2017-04-26 11:07:44.516 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] 
> Instantiated KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, 
> timeUnit=SECONDS}, ratio=TimeInterval{length=0, timeUnit=MILLISECONDS}, 
> maxRetries=2147483647, maxRetryDelay=TimeInterval{length=0, 
> timeUnit=MILLISECONDS}}
> 2017-04-26 11:07:45.048 o.a.s.k.s.KafkaSpout [INFO] Kafka Spout opened with 
> the following configuration: 
> KafkaSpoutConfig{kafkaProps={enable.auto.commit=false, 
> request.timeout.ms=30000, group.id=Group1, 
> bootstrap.servers=192.168.1.143:9092, session.timeout.ms=20000}, 
> key=org.apache.kafka.common.serialization.StringDeserializer@1b5080fd, 
> value=org.apache.kafka.common.serialization.StringDeserializer@2720873b, 
> pollTimeoutMs=200, offsetCommitPeriodMs=5000, maxUncommittedOffsets=1000, 
> firstPollOffsetStrategy=UNCOMMITTED_EARLIEST, 
> subscription=org.apache.storm.kafka.spout.NamedSubscription@7f068c1f, 
> translator=org.apache.storm.kafka.spout.SimpleRecordTranslator@1f1ca6a2, 
> retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, 
> timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, 
> maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, 
> timeUnit=SECONDS}}}
> 2017-04-26 11:07:45.111 o.a.s.k.s.KafkaSpout [INFO] Kafka Spout opened with 
> the following configuration: 
> KafkaSpoutConfig{kafkaProps={enable.auto.commit=false, 
> request.timeout.ms=30000, group.id=Group2, 
> bootstrap.servers=192.168.1.143:9092, session.timeout.ms=20000}, 
> key=org.apache.kafka.common.serialization.StringDeserializer@45ffa954, 
> value=org.apache.kafka.common.serialization.StringDeserializer@4b384f9b, 
> pollTimeoutMs=200, offsetCommitPeriodMs=5000, maxUncommittedOffsets=1000, 
> firstPollOffsetStrategy=UNCOMMITTED_EARLIEST, 
> subscription=org.apache.storm.kafka.spout.NamedSubscription@4f07c224, 
> translator=org.apache.storm.kafka.spout.SimpleRecordTranslator@a0545a0, 
> retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, 
> timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, 
> maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, 
> timeUnit=SECONDS}}}
> 2017-04-26 11:07:45.297 o.a.s.k.s.NamedSubscription [INFO] Kafka consumer 
> subscribed topics [topic-1]
> 2017-04-26 11:07:45.302 o.a.s.k.s.NamedSubscription [INFO] Kafka consumer 
> subscribed topics [topic-2]
> 2017-04-26 11:07:45.456 o.a.s.k.s.KafkaSpout [INFO] Partitions revoked. 
> [consumer-group=Group1, 
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32cbdbb0, 
> topic-partitions=[]]
> 2017-04-26 11:07:45.463 o.a.s.k.s.KafkaSpout [INFO] Partitions revoked. 
> [consumer-group=Group1, 
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@275d5222, 
> topic-partitions=[]]
> 2017-04-26 11:07:45.545 o.a.s.k.s.KafkaSpout [INFO] Partitions reassignment. 
> [consumer-group=Group1, 
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@275d5222, 
> topic-partitions=[topic-1]]
> 2017-04-26 11:07:45.546 o.a.s.k.s.KafkaSpout [INFO] Partitions reassignment. 
> [consumer-group=Group1, 
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32cbdbb0, 
> topic-partitions=[topic-2]]
> 2017-04-26 11:07:45.551 o.a.s.k.s.i.OffsetManager [DEBUG] Instantiated 
> OffsetManager{topic-partition=topic-1, fetchOffset=11803, 
> committedOffset=11802, ackedMsgs=[]}
> 2017-04-26 11:07:45.551 o.a.s.k.s.i.OffsetManager [DEBUG] Instantiated 
> OffsetManager{topic-partition=topic-2, fetchOffset=11801, 
> committedOffset=11800, ackedMsgs=[]}
> 2017-04-26 11:07:45.552 o.a.s.k.s.KafkaSpout [INFO] Initialization complete
> 2017-04-26 11:07:45.552 o.a.s.k.s.KafkaSpout [INFO] Initialization complete
> {code}
> I think expected behaviour would be that KafkaSpout would recover from 
> exception (client will reconnect and get partitions reassigned) without 
> worker getting killed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to