[ https://issues.apache.org/jira/browse/KAFKA-7112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526581#comment-16526581 ]
ASF GitHub Bot commented on KAFKA-7112: --------------------------------------- guozhangwang closed pull request #5306: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after poll URL: https://github.com/apache/kafka/pull/5306 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a159e7b6c7a..77538ae9c78 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -804,21 +804,26 @@ long runOnce(final long recordsProcessedBeforeCommit) { // try to fetch some records with zero poll millis // to unblock the restoration as soon as possible records = pollRequests(Duration.ZERO); + } else if (state == State.PARTITIONS_REVOKED) { + // try to fetch some records with normal poll time + // in order to wait long enough to get the join response + records = pollRequests(pollTime); + } else if (state == State.RUNNING) { + // try to fetch some records with normal poll time + // in order to get long polling + records = pollRequests(pollTime); + } else { + // any other state should not happen + log.error("Unexpected state {} during normal iteration", state); + throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); + } + // only try to initialize the assigned tasks + // if the state is still in PARTITION_ASSIGNED after the poll call + if (state == State.PARTITIONS_ASSIGNED) { if (taskManager.updateNewAndRestoringTasks()) { setState(State.RUNNING); } - } else { - // try to fetch some records if necessary - records = pollRequests(pollTime); - - // if state changed after the poll call, - // try to initialize the assigned tasks again - if (state == State.PARTITIONS_ASSIGNED) { - if (taskManager.updateNewAndRestoringTasks()) { - setState(State.RUNNING); - } - } } if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks()) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamThread does not check for state again after pollRequests() > ---------------------------------------------------------------- > > Key: KAFKA-7112 > URL: https://issues.apache.org/jira/browse/KAFKA-7112 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > > In StreamThread's main loop, we have: > {code} > if (state == State.PARTITIONS_ASSIGNED) { > // try to fetch some records with zero poll millis > // to unblock the restoration as soon as possible > records = pollRequests(Duration.ZERO); > if (taskManager.updateNewAndRestoringTasks()) { > setState(State.RUNNING); > } > } > {code} > in which we first check for state, and if it is {{PARTITIONS_ASSIGNED}} then > call `consumer.poll()` and then call > `askManager.updateNewAndRestoringTasks()`. There is a race condition though, > that if another rebalance gets triggered, then `onPartitionRevoked` will be > called in which we will {{restoreConsumer.unsubscribe();}}, and then if we > call {{taskManager.updateNewAndRestoringTasks()}} right away we will see this: > {code} > Encountered the following error during processing: > (org.apache.kafka.streams.processor.internals.StreamThread) > java.lang.IllegalStateException: Consumer is not subscribed to any topics or > assigned any partitions > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)