showuon commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r718240534
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -188,8 +188,9 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, * cleanup from the previous generation (such as committing offsets for the consumer) * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none + * @param pollTimer A Timer constructed by the poll() timeout time set by the customer */ - protected abstract void onJoinPrepare(int generation, String memberId); + protected abstract void onJoinPrepare(int generation, String memberId, final Timer pollTimer); Review comment: Since this timer is not always from `poll`, do you think we can rename it to `offsetCommitTimer` or something? And same comments apply to the java doc and other places. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1054,6 +1059,7 @@ private void doAutoCommitOffsetsAsync() { private void maybeAutoCommitOffsetsSync(Timer timer) { if (autoCommitEnabled) { Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); + Review comment: nit: additional new line ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } + private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> partitionOffsetsToBeCommitted) { + + if (partitionOffsetsToBeCommitted.isEmpty()) + return; + + Set<String> validTopics = metadata.fetch().topics(); + Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>(); + + Iterator<TopicPartition> iterator = partitionOffsetsToBeCommitted.keySet().iterator(); + + while (iterator.hasNext()) { + + TopicPartition topicPartition = iterator.next(); + + if (!validTopics.contains(topicPartition.topic())) { + + toGiveUpTopicPartitions.add(topicPartition); + iterator.remove(); + } + + } + + if (toGiveUpTopicPartitions.size() > 0) { + + //Because toGiveUpTopicPartitions may receive `UnknownTopicOrPartitionException` when submitting their offsets. + //We are prepared to abandon them. The worst effect is that these partitions may repeatedly consume some messages Review comment: Do you think this is better? //We might get`UnknownTopicOrPartitionException` after submitting their offsets due to topics been deleted. We should update the offsets list here. The worst effect is that we may keep retrying to commit the offsets for the topics not existed any more, before timeout reached. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } + private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> partitionOffsetsToBeCommitted) { + + if (partitionOffsetsToBeCommitted.isEmpty()) + return; + + Set<String> validTopics = metadata.fetch().topics(); + Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>(); + + Iterator<TopicPartition> iterator = partitionOffsetsToBeCommitted.keySet().iterator(); + + while (iterator.hasNext()) { + + TopicPartition topicPartition = iterator.next(); + + if (!validTopics.contains(topicPartition.topic())) { + + toGiveUpTopicPartitions.add(topicPartition); + iterator.remove(); + } + Review comment: nit: additional new line ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -188,8 +188,9 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, * cleanup from the previous generation (such as committing offsets for the consumer) * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none + * @param pollTimer A Timer constructed by the poll() timeout time set by the customer Review comment: Maybe change to "The timer for committing offsets synchronously"? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } + private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> partitionOffsetsToBeCommitted) { + Review comment: nit: additional new line ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1016,6 +1018,9 @@ public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, if (future.failed() && !future.isRetriable()) throw future.exception(); + if(future.exception() instanceof UnknownTopicOrPartitionException) + cleanUpConsumedOffsets(offsets); + timer.sleep(rebalanceConfig.retryBackoffMs); Review comment: It's not good to put the `cleanUpConsumedOffsets` before sleep since the topics could be changed during sleep. Maybe we can set a flag and clean up before next request sending? something like this: ``` java boolean shouldCleanUpConsumedOffsets = false; do { // check here if (shouldCleanUpConsumedOffsets) { cleanUpConsumedOffsets(offsets); shouldCleanUpConsumedOffsets = false; } RequestFuture<Void> future = sendOffsetCommitRequest(offsets); ... if(future.exception() instanceof UnknownTopicOrPartitionException) shouldCleanUpConsumedOffsets = true; } while (timer.notExpired()); ``` What do you think? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } + private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> partitionOffsetsToBeCommitted) { + + if (partitionOffsetsToBeCommitted.isEmpty()) + return; + + Set<String> validTopics = metadata.fetch().topics(); + Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>(); + + Iterator<TopicPartition> iterator = partitionOffsetsToBeCommitted.keySet().iterator(); + + while (iterator.hasNext()) { + Review comment: nit: additional new line ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } + private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> partitionOffsetsToBeCommitted) { + + if (partitionOffsetsToBeCommitted.isEmpty()) + return; + + Set<String> validTopics = metadata.fetch().topics(); + Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>(); + + Iterator<TopicPartition> iterator = partitionOffsetsToBeCommitted.keySet().iterator(); + + while (iterator.hasNext()) { + + TopicPartition topicPartition = iterator.next(); + + if (!validTopics.contains(topicPartition.topic())) { + + toGiveUpTopicPartitions.add(topicPartition); + iterator.remove(); + } + + } + + if (toGiveUpTopicPartitions.size() > 0) { + Review comment: nit: additional new line ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } + private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> partitionOffsetsToBeCommitted) { + + if (partitionOffsetsToBeCommitted.isEmpty()) + return; + + Set<String> validTopics = metadata.fetch().topics(); + Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>(); + + Iterator<TopicPartition> iterator = partitionOffsetsToBeCommitted.keySet().iterator(); + + while (iterator.hasNext()) { + + TopicPartition topicPartition = iterator.next(); + + if (!validTopics.contains(topicPartition.topic())) { + + toGiveUpTopicPartitions.add(topicPartition); + iterator.remove(); + } + + } + + if (toGiveUpTopicPartitions.size() > 0) { + + //Because toGiveUpTopicPartitions may receive `UnknownTopicOrPartitionException` when submitting their offsets. + //We are prepared to abandon them. The worst effect is that these partitions may repeatedly consume some messages + log.warn("Synchronous auto-commit of offsets {} will be abandoned", toGiveUpTopicPartitions); + Review comment: nit: additional new line ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ########## @@ -278,7 +278,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); try { Timer firstAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future<Boolean> firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer)); + Future<Boolean> firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer, mockTime.timer(0))); Review comment: In my opinion, even though this is unit test, it might be safer to set the timer more than 0 because there could be possible that other retriable exception be thrown while commiting offsets. How about `mockTime.timer(10)`? (And same to other places in tests) ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1069,6 +1074,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } + private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> willCommitOffsets) { + + if (willCommitOffsets.isEmpty()) + return; + + Set<String> validTopics = metadata.fetch().topics(); + Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>(); + + Iterator<Map.Entry<TopicPartition, OffsetAndMetadata>> iterator = willCommitOffsets.entrySet().iterator(); + + while (iterator.hasNext()) { + + Map.Entry<TopicPartition, OffsetAndMetadata> entry = iterator.next(); + + if (!validTopics.contains(entry.getKey().topic())) { + + toGiveUpTopicPartitions.add(entry.getKey()); Review comment: OK, make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org