showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r718240534



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -188,8 +188,9 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
      * cleanup from the previous generation (such as committing offsets for 
the consumer)
      * @param generation The previous generation or -1 if there was none
      * @param memberId The identifier of this member in the previous group or 
"" if there was none
+     * @param pollTimer A Timer constructed by the poll() timeout time set by 
the customer
      */
-    protected abstract void onJoinPrepare(int generation, String memberId);
+    protected abstract void onJoinPrepare(int generation, String memberId, 
final Timer pollTimer);

Review comment:
       Since this timer is not always from `poll`, do you think we can rename 
it to `offsetCommitTimer` or something? And same comments apply to the java doc 
and other places.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1054,6 +1059,7 @@ private void doAutoCommitOffsetsAsync() {
     private void maybeAutoCommitOffsetsSync(Timer timer) {
         if (autoCommitEnabled) {
             Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
+

Review comment:
       nit: additional new line

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
partitionOffsetsToBeCommitted) {
+
+        if (partitionOffsetsToBeCommitted.isEmpty())
+            return;
+
+        Set<String> validTopics = metadata.fetch().topics();
+        Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>();
+
+        Iterator<TopicPartition> iterator = 
partitionOffsetsToBeCommitted.keySet().iterator();
+
+        while (iterator.hasNext()) {
+
+            TopicPartition topicPartition = iterator.next();
+
+            if (!validTopics.contains(topicPartition.topic())) {
+
+                toGiveUpTopicPartitions.add(topicPartition);
+                iterator.remove();
+            }
+
+        }
+
+        if (toGiveUpTopicPartitions.size() > 0) {
+
+            //Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
+            //We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages

Review comment:
       Do you think this is better?
   //We might get`UnknownTopicOrPartitionException` after submitting their 
offsets due to topics been deleted. We should update the offsets list here. The 
worst effect is that we may keep retrying to commit the offsets for the topics 
not existed any more, before timeout reached.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
partitionOffsetsToBeCommitted) {
+
+        if (partitionOffsetsToBeCommitted.isEmpty())
+            return;
+
+        Set<String> validTopics = metadata.fetch().topics();
+        Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>();
+
+        Iterator<TopicPartition> iterator = 
partitionOffsetsToBeCommitted.keySet().iterator();
+
+        while (iterator.hasNext()) {
+
+            TopicPartition topicPartition = iterator.next();
+
+            if (!validTopics.contains(topicPartition.topic())) {
+
+                toGiveUpTopicPartitions.add(topicPartition);
+                iterator.remove();
+            }
+

Review comment:
       nit: additional new line

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -188,8 +188,9 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
      * cleanup from the previous generation (such as committing offsets for 
the consumer)
      * @param generation The previous generation or -1 if there was none
      * @param memberId The identifier of this member in the previous group or 
"" if there was none
+     * @param pollTimer A Timer constructed by the poll() timeout time set by 
the customer

Review comment:
       Maybe change to "The timer for committing offsets synchronously"?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
partitionOffsetsToBeCommitted) {
+

Review comment:
       nit: additional new line

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1016,6 +1018,9 @@ public boolean commitOffsetsSync(Map<TopicPartition, 
OffsetAndMetadata> offsets,
             if (future.failed() && !future.isRetriable())
                 throw future.exception();
 
+            if(future.exception() instanceof UnknownTopicOrPartitionException)
+                cleanUpConsumedOffsets(offsets);
+
             timer.sleep(rebalanceConfig.retryBackoffMs);

Review comment:
       It's not good to put the `cleanUpConsumedOffsets` before sleep since the 
topics could be changed during sleep. Maybe we can set a flag and clean up 
before next request sending?
   something like this:
   ``` java
   boolean shouldCleanUpConsumedOffsets = false;
   do {
     // check here
     if (shouldCleanUpConsumedOffsets) {
       cleanUpConsumedOffsets(offsets);
       shouldCleanUpConsumedOffsets = false;
     }
     RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
   ...
    
     if(future.exception() instanceof UnknownTopicOrPartitionException)
       shouldCleanUpConsumedOffsets = true;
   } while (timer.notExpired());
   ```
   
   What do you think?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
partitionOffsetsToBeCommitted) {
+
+        if (partitionOffsetsToBeCommitted.isEmpty())
+            return;
+
+        Set<String> validTopics = metadata.fetch().topics();
+        Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>();
+
+        Iterator<TopicPartition> iterator = 
partitionOffsetsToBeCommitted.keySet().iterator();
+
+        while (iterator.hasNext()) {
+

Review comment:
       nit: additional new line

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
partitionOffsetsToBeCommitted) {
+
+        if (partitionOffsetsToBeCommitted.isEmpty())
+            return;
+
+        Set<String> validTopics = metadata.fetch().topics();
+        Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>();
+
+        Iterator<TopicPartition> iterator = 
partitionOffsetsToBeCommitted.keySet().iterator();
+
+        while (iterator.hasNext()) {
+
+            TopicPartition topicPartition = iterator.next();
+
+            if (!validTopics.contains(topicPartition.topic())) {
+
+                toGiveUpTopicPartitions.add(topicPartition);
+                iterator.remove();
+            }
+
+        }
+
+        if (toGiveUpTopicPartitions.size() > 0) {
+

Review comment:
       nit: additional new line

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1075,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
partitionOffsetsToBeCommitted) {
+
+        if (partitionOffsetsToBeCommitted.isEmpty())
+            return;
+
+        Set<String> validTopics = metadata.fetch().topics();
+        Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>();
+
+        Iterator<TopicPartition> iterator = 
partitionOffsetsToBeCommitted.keySet().iterator();
+
+        while (iterator.hasNext()) {
+
+            TopicPartition topicPartition = iterator.next();
+
+            if (!validTopics.contains(topicPartition.topic())) {
+
+                toGiveUpTopicPartitions.add(topicPartition);
+                iterator.remove();
+            }
+
+        }
+
+        if (toGiveUpTopicPartitions.size() > 0) {
+
+            //Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
+            //We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages
+            log.warn("Synchronous auto-commit of offsets {} will be 
abandoned", toGiveUpTopicPartitions);
+

Review comment:
       nit: additional new line

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -278,7 +278,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws 
Exception {
         ExecutorService executor = Executors.newFixedThreadPool(1);
         try {
             Timer firstAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS);
-            Future<Boolean> firstAttempt = executor.submit(() -> 
coordinator.joinGroupIfNeeded(firstAttemptTimer));
+            Future<Boolean> firstAttempt = executor.submit(() -> 
coordinator.joinGroupIfNeeded(firstAttemptTimer, mockTime.timer(0)));

Review comment:
       In my opinion, even though this is unit test, it might be safer to set 
the timer more than 0 because there could be possible that other retriable 
exception be thrown while commiting offsets. How about `mockTime.timer(10)`? 
(And same to other places in tests)

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1074,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
willCommitOffsets) {
+
+        if (willCommitOffsets.isEmpty())
+            return;
+
+        Set<String> validTopics = metadata.fetch().topics();
+        Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>();
+
+        Iterator<Map.Entry<TopicPartition, OffsetAndMetadata>> iterator = 
willCommitOffsets.entrySet().iterator();
+
+        while (iterator.hasNext()) {
+
+            Map.Entry<TopicPartition, OffsetAndMetadata> entry = 
iterator.next();
+
+            if (!validTopics.contains(entry.getKey().topic())) {
+
+                toGiveUpTopicPartitions.add(entry.getKey());

Review comment:
       OK, make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to