lucasbru commented on code in PR #21495:
URL: https://github.com/apache/kafka/pull/21495#discussion_r2846983313


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -817,11 +817,13 @@ private void transitionToStale() {
      *  - There are topics that haven't been added to the current assignment 
yet, but all their topic IDs
      *    are missing from the target assignment.
      *
-     * @param canCommit Controls whether reconciliation can proceed when 
auto-commit is enabled.
-     *                  Set to true only when the current offset positions are 
safe to commit.
-     *                  If false and auto-commit enabled, the reconciliation 
will be skipped.
+     * @param invokedByPoll True if this reconciliation attempt is triggered 
by the application thread on consumer.poll().

Review Comment:
   nit: invokedByPoll is set to false when called from poll. From 
RequestManager.poll. That's a bit confusing. Maybe call it 
`invokedByConsumerPoll` or `invokedByUserPoll`? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -817,11 +817,13 @@ private void transitionToStale() {
      *  - There are topics that haven't been added to the current assignment 
yet, but all their topic IDs
      *    are missing from the target assignment.
      *
-     * @param canCommit Controls whether reconciliation can proceed when 
auto-commit is enabled.
-     *                  Set to true only when the current offset positions are 
safe to commit.
-     *                  If false and auto-commit enabled, the reconciliation 
will be skipped.
+     * @param invokedByPoll True if this reconciliation attempt is triggered 
by the application thread on consumer.poll().
+     *                      False if this is triggered by the background 
thread on regular manager poll.
+     *                      In both cases we want to resolve metadata to 
unresolved assignments,
+     *                      but the actual reconciliation (commit, callbacks, 
assignment updates)
+     *                      will only proceed if this is triggered from the 
application thread on consumer.poll
      */
-    public void maybeReconcile(boolean canCommit) {
+    public void maybeReconcile(boolean invokedByPoll) {

Review Comment:
   ShareConsumer aside, I am not sure this change fully fixes the problem, but 
maybe I am just not seeing it. It's specifically about the case where we are 
triggering the reconciliation from `poll`, have a rebalance listener 
registered, but aren't waiting long enough before exiting poll.
   
   The event sequence that I see is this:
   
     1. maybeReconcile(true) is called on the background thread during 
AsyncPollEvent processing
     2. It kicks off the reconciliation chain: commit → revokePartitions() → 
enqueues ConsumerRebalanceListenerCallbackNeededEvent onto the background event 
queue
     3. Back on the application thread, we executes the onPartitionsRevoked 
callback, and sends back a ConsumerRebalanceListenerCallbackCompletedEvent
     4. The background thread picks up that completed event, calls 
future.complete(), which triggers the thenCompose → assignPartitions() → 
updateSubscriptionAwaitingCallback()
   
   I think after Step 3, the application thread can immediately exit 
`Consumer.poll` and go into `Consumer.seek`. Does `isValidatePositionsComplete` 
actually gate against this? I do not see it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to