[ 
https://issues.apache.org/jira/browse/KAFKA-7112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526581#comment-16526581
 ] 

ASF GitHub Bot commented on KAFKA-7112:
---------------------------------------

guozhangwang closed pull request #5306: KAFKA-7112: Only resume restoration if 
state is still PARTITIONS_ASSIGNED after poll
URL: https://github.com/apache/kafka/pull/5306
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a159e7b6c7a..77538ae9c78 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -804,21 +804,26 @@ long runOnce(final long recordsProcessedBeforeCommit) {
             // try to fetch some records with zero poll millis
             // to unblock the restoration as soon as possible
             records = pollRequests(Duration.ZERO);
+        } else if (state == State.PARTITIONS_REVOKED) {
+            // try to fetch some records with normal poll time
+            // in order to wait long enough to get the join response
+            records = pollRequests(pollTime);
+        } else if (state == State.RUNNING) {
+            // try to fetch some records with normal poll time
+            // in order to get long polling
+            records = pollRequests(pollTime);
+        } else {
+            // any other state should not happen
+            log.error("Unexpected state {} during normal iteration", state);
+            throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
+        }
 
+        // only try to initialize the assigned tasks
+        // if the state is still in PARTITION_ASSIGNED after the poll call
+        if (state == State.PARTITIONS_ASSIGNED) {
             if (taskManager.updateNewAndRestoringTasks()) {
                 setState(State.RUNNING);
             }
-        } else {
-            // try to fetch some records if necessary
-            records = pollRequests(pollTime);
-
-            // if state changed after the poll call,
-            // try to initialize the assigned tasks again
-            if (state == State.PARTITIONS_ASSIGNED) {
-                if (taskManager.updateNewAndRestoringTasks()) {
-                    setState(State.RUNNING);
-                }
-            }
         }
 
         if (records != null && !records.isEmpty() && 
taskManager.hasActiveRunningTasks()) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread does not check for state again after pollRequests()
> ----------------------------------------------------------------
>
>                 Key: KAFKA-7112
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7112
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>
> In StreamThread's main loop, we have:
> {code}
>         if (state == State.PARTITIONS_ASSIGNED) {
>             // try to fetch some records with zero poll millis
>             // to unblock the restoration as soon as possible
>             records = pollRequests(Duration.ZERO);
>             if (taskManager.updateNewAndRestoringTasks()) {
>                 setState(State.RUNNING);
>             }
>         }
> {code}
> in which we first check for state, and if it is {{PARTITIONS_ASSIGNED}} then 
> call `consumer.poll()` and then call 
> `askManager.updateNewAndRestoringTasks()`. There is a race condition though, 
> that if another rebalance gets triggered, then `onPartitionRevoked` will be 
> called in which we will {{restoreConsumer.unsubscribe();}}, and then if we 
> call {{taskManager.updateNewAndRestoringTasks()}} right away we will see this:
> {code}
> Encountered the following error during processing: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
> assigned any partitions
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150)
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to