[ https://issues.apache.org/jira/browse/STORM-2494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hugo Louro resolved STORM-2494. ------------------------------- Resolution: Fixed Fix Version/s: 1.2.0 STORM-2640 also fixes STORM-2494 > KafkaSpout does not handle CommitFailedException > ------------------------------------------------ > > Key: STORM-2494 > URL: https://issues.apache.org/jira/browse/STORM-2494 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Affects Versions: 1.1.0 > Reporter: Yuri Barseghyan > Assignee: Hugo Louro > Fix For: 1.2.0 > > > In situations when tuple processing takes longer than session timeout, we get > CommitFailedException and instead of recovering from it Storm worker dies. > {code} > 2017-04-26 11:07:04.902 o.a.s.util [ERROR] Async loop died! > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured session.timeout.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > \tat > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > ~[stormjar.jar:3.0.2] > \tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) > ~[stormjar.jar:3.0.2] > \tat > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384) > ~[stormjar.jar:3.0.2] > \tat org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) > ~[stormjar.jar:3.0.2] > \tat > org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644) > ~[storm-core-1.1.0.jar:1.1.0] > \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.1.0.jar:1.1.0] > \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > 2017-04-26 11:07:04.909 o.a.s.d.executor [ERROR] > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured session.timeout.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > \tat > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > ~[stormjar.jar:3.0.2] > \tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) > ~[stormjar.jar:3.0.2] > \tat > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) > ~[stormjar.jar:3.0.2] > \tat > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384) > ~[stormjar.jar:3.0.2] > \tat org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) > ~[stormjar.jar:3.0.2] > \tat > org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644) > ~[storm-core-1.1.0.jar:1.1.0] > \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.1.0.jar:1.1.0] > \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > 2017-04-26 11:07:04.953 o.a.s.util [ERROR] Halting process: (\"Worker died\") > java.lang.RuntimeException: (\"Worker died\") > \tat org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) > [storm-core-1.1.0.jar:1.1.0] > \tat clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?] > \tat org.apache.storm.daemon.worker$fn__5646$fn__5647.invoke(worker.clj:763) > [storm-core-1.1.0.jar:1.1.0] > \tat > org.apache.storm.daemon.executor$mk_executor_data$fn__4863$fn__4864.invoke(executor.clj:274) > [storm-core-1.1.0.jar:1.1.0] > \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494) > [storm-core-1.1.0.jar:1.1.0] > \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > 2017-04-26 11:07:44.507 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] > Instantiated KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, > timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, > maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, > timeUnit=SECONDS}} > 2017-04-26 11:07:44.516 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] > Instantiated KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, > timeUnit=SECONDS}, ratio=TimeInterval{length=0, timeUnit=MILLISECONDS}, > maxRetries=2147483647, maxRetryDelay=TimeInterval{length=0, > timeUnit=MILLISECONDS}} > 2017-04-26 11:07:45.048 o.a.s.k.s.KafkaSpout [INFO] Kafka Spout opened with > the following configuration: > KafkaSpoutConfig{kafkaProps={enable.auto.commit=false, > request.timeout.ms=30000, group.id=Group1, > bootstrap.servers=192.168.1.143:9092, session.timeout.ms=20000}, > key=org.apache.kafka.common.serialization.StringDeserializer@1b5080fd, > value=org.apache.kafka.common.serialization.StringDeserializer@2720873b, > pollTimeoutMs=200, offsetCommitPeriodMs=5000, maxUncommittedOffsets=1000, > firstPollOffsetStrategy=UNCOMMITTED_EARLIEST, > subscription=org.apache.storm.kafka.spout.NamedSubscription@7f068c1f, > translator=org.apache.storm.kafka.spout.SimpleRecordTranslator@1f1ca6a2, > retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, > timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, > maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, > timeUnit=SECONDS}}} > 2017-04-26 11:07:45.111 o.a.s.k.s.KafkaSpout [INFO] Kafka Spout opened with > the following configuration: > KafkaSpoutConfig{kafkaProps={enable.auto.commit=false, > request.timeout.ms=30000, group.id=Group2, > bootstrap.servers=192.168.1.143:9092, session.timeout.ms=20000}, > key=org.apache.kafka.common.serialization.StringDeserializer@45ffa954, > value=org.apache.kafka.common.serialization.StringDeserializer@4b384f9b, > pollTimeoutMs=200, offsetCommitPeriodMs=5000, maxUncommittedOffsets=1000, > firstPollOffsetStrategy=UNCOMMITTED_EARLIEST, > subscription=org.apache.storm.kafka.spout.NamedSubscription@4f07c224, > translator=org.apache.storm.kafka.spout.SimpleRecordTranslator@a0545a0, > retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, > timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, > maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, > timeUnit=SECONDS}}} > 2017-04-26 11:07:45.297 o.a.s.k.s.NamedSubscription [INFO] Kafka consumer > subscribed topics [topic-1] > 2017-04-26 11:07:45.302 o.a.s.k.s.NamedSubscription [INFO] Kafka consumer > subscribed topics [topic-2] > 2017-04-26 11:07:45.456 o.a.s.k.s.KafkaSpout [INFO] Partitions revoked. > [consumer-group=Group1, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32cbdbb0, > topic-partitions=[]] > 2017-04-26 11:07:45.463 o.a.s.k.s.KafkaSpout [INFO] Partitions revoked. > [consumer-group=Group1, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@275d5222, > topic-partitions=[]] > 2017-04-26 11:07:45.545 o.a.s.k.s.KafkaSpout [INFO] Partitions reassignment. > [consumer-group=Group1, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@275d5222, > topic-partitions=[topic-1]] > 2017-04-26 11:07:45.546 o.a.s.k.s.KafkaSpout [INFO] Partitions reassignment. > [consumer-group=Group1, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32cbdbb0, > topic-partitions=[topic-2]] > 2017-04-26 11:07:45.551 o.a.s.k.s.i.OffsetManager [DEBUG] Instantiated > OffsetManager{topic-partition=topic-1, fetchOffset=11803, > committedOffset=11802, ackedMsgs=[]} > 2017-04-26 11:07:45.551 o.a.s.k.s.i.OffsetManager [DEBUG] Instantiated > OffsetManager{topic-partition=topic-2, fetchOffset=11801, > committedOffset=11800, ackedMsgs=[]} > 2017-04-26 11:07:45.552 o.a.s.k.s.KafkaSpout [INFO] Initialization complete > 2017-04-26 11:07:45.552 o.a.s.k.s.KafkaSpout [INFO] Initialization complete > {code} > I think expected behaviour would be that KafkaSpout would recover from > exception (client will reconnect and get partitions reassigned) without > worker getting killed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)