chickenchickenlove commented on PR #19631: URL: https://github.com/apache/kafka/pull/19631#issuecomment-2849263268
Hey, @mjsax , @m1a2st . I dug into this problem more deeply. The issue became clearer, so I reverted the previous commit and added a new commit that introduces wider `synchronized` block. Because `synchronized` in `generationIfStable()` is insufficient to prevent unexpected race conditions. So, This situation can cause event skip problem. ``` // In ConsumerCoordinator.sendOffsetCommitRequest(...) // Main Thread do 1. generation = generationIfStable(); // <- MemberState.COMPLETING_REBALANCE, 2. groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null); // <- MemberState.COMPLETING_REBALANCE 3. if (generation == null) { // <- MemberState.COMPLETING_REBALANCE // In SyncGroupResponseHandler.handle(...) // Consumer Coordinator Heartbeat Thread 4. synchronized (AbstractCoordinator.this) { .. } // <- MemberState.COMPLETING_REBALANCE 5. log.info("Successfully synced group in generation {}", generation); // MemberState.COMPLETING_REBALANCE ... // In ConsumerCoordinator.sendOffsetCommitRequest(...) // Main Thread do 6. if (rebalanceInProgress()) { // <- MemberState.COMPLETING_REBALANCE // In SyncGroupResponseHandler.handle(...) // Consumer Coordinator Heartbeat Thread 7. state = MemberState.STABLE; // <- MemberState.STABLE // In ConsumerCoordinator.sendOffsetCommitRequest(...) // Main Thread do 8. else { return RequestFuture.failure(new CommitFailedException("Offset ...)) } // <- MemberState.STABLE ``` This race condition can prevent certain records from being committed, even when the previous partition is reassigned to the consumer during rebalancing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org