lianetm commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1758694480


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -218,18 +490,17 @@ public CompletableFuture<Void> resetPositionsIfNeeded() {
      *
      * <p/>
      *
-     * When a response is received, positions are validated and, if a log 
truncation is
-     * detected, a {@link LogTruncationException} will be saved in memory, to 
be thrown on the
+     * When a response is received, positions are validated and, if a log 
truncation is detected, a
+     * {@link LogTruncationException} will be saved in memory in 
cachedUpdatePositionsException, to be thrown on the
      * next call to this function.
      */
-    public CompletableFuture<Void> validatePositionsIfNeeded() {
-        Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate =
-                offsetFetcherUtils.getPartitionsToValidate();
+    void validatePositionsIfNeeded() {
+        Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate();
         if (partitionsToValidate.isEmpty()) {
-            return CompletableFuture.completedFuture(null);
+            return;
         }
 
-        return 
sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
+        
sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);

Review Comment:
   sure, done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -218,18 +490,17 @@ public CompletableFuture<Void> resetPositionsIfNeeded() {
      *
      * <p/>
      *
-     * When a response is received, positions are validated and, if a log 
truncation is
-     * detected, a {@link LogTruncationException} will be saved in memory, to 
be thrown on the
+     * When a response is received, positions are validated and, if a log 
truncation is detected, a
+     * {@link LogTruncationException} will be saved in memory in 
cachedUpdatePositionsException, to be thrown on the
      * next call to this function.
      */
-    public CompletableFuture<Void> validatePositionsIfNeeded() {
-        Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate =
-                offsetFetcherUtils.getPartitionsToValidate();
+    void validatePositionsIfNeeded() {
+        Map<TopicPartition, SubscriptionState.FetchPosition> 
partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate();
         if (partitionsToValidate.isEmpty()) {

Review Comment:
   You're totally right that here we could have empty partitions to validate 
(only because we may have some but there's already a 
`OffsetsForLeaderEpochRequest` in-flight for them). But my expectation is that 
even in that case, we make sure that all partitions end up being validated 
because of how we handle the OffsetsForLeaderEpochRequest response :
   - if it succeeds, partitions get validated
   - if it fails, the next allowed retry is updated for those partitions, so 
they are validated on the next poll/updateFetchPositions after the backoff 
expires (`partitionsToValidate` won't be empty on that next run)
   
https://github.com/apache/kafka/blob/02e3f7cc284d062adad5324d9493f39559346ae7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java#L396
   
   Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to