chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r430253921



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -769,20 +813,25 @@ class GroupCoordinator(val brokerId: Int,
             // on heartbeat response to eventually notify the rebalance in 
progress signal to the consumer
             val member = group.get(memberId)
             completeAndScheduleNextHeartbeatExpiration(group, member)
-            groupManager.storeOffsets(group, memberId, offsetMetadata, 
responseCallback)
+            partitionsToComplete ++= groupManager.storeOffsets(
+              group = group,
+              consumerId = memberId,
+              offsetMetadata = offsetMetadata,
+              responseCallback = responseCallback,
+              completeDelayedRequests = false)
 
           case CompletingRebalance =>
             // We should not receive a commit request if the group has not 
completed rebalance;
             // but since the consumer's member.id and generation is valid, it 
means it has received
             // the latest group generation information from the JoinResponse.
             // So let's return a REBALANCE_IN_PROGRESS to let consumer handle 
it gracefully.
             responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.REBALANCE_IN_PROGRESS })
-
           case _ =>
             throw new RuntimeException(s"Logic error: unexpected group state 
${group.currentState}")
         }
       }
     }
+    completeDelayedRequests(partitionsToComplete)

Review comment:
       @junrao Could you take a look? I'd like to address @hachikuji comment 
but it produces a big change to this PR. Hence, it would be better to have more 
reviews/suggestions before kicking off.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to