hachikuji commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r434267922



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
##########
@@ -36,7 +36,8 @@
     private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;
 
     public LogTruncationException(Map<TopicPartition, OffsetAndMetadata> 
divergentOffsets) {
-        super(Utils.transformMap(divergentOffsets, Function.identity(), 
OffsetAndMetadata::offset));
+        super(Utils.transformMap(divergentOffsets, Function.identity(), 
OffsetAndMetadata::offset),
+            "detected log truncation");

Review comment:
       I'd suggest a minor change to this. One of the issues here is that we 
don't get the divergent offsets in the message itself. I know that might not 
seem like a big deal, but sometimes the exception trace is all we get from the 
user. It's also surprising when an exception constructor takes a string 
parameter which is not the message itself. Can we do something like this 
instead?
   ```java
   class LogTruncationException {
     public LogTruncationException(Map<TopicPartition, OffsetAndMetadata> 
divergentOffsets) {
       super(Utils.transformMap(divergentOffsets, Function.identity(), 
OffsetAndMetadata::offset),
         "Detected log truncation with diverging offsets " + divergentOffsets);
       this.divergentOffsets = Collections.unmodifiableMap(divergentOffsets);
     }
   }
   
   class OffsetOutOfRangeException {
     public OffsetOutOfRangeException(Map<TopicPartition, Long> 
offsetOutOfRangePartitions) {
       this(offsetOutOfRangePartitions, "Offsets out of range with no 
configured reset policy for partitions: " +
               offsetOutOfRangePartitions);
     }
   
     public OffsetOutOfRangeException(String message, Map<TopicPartition, Long> 
offsetOutOfRangePartitions) {
       super(message);
       this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
     }
   }
   
   class Fetcher {
     private handleOffsetOutOfRange(..., String reason) {
       ...
       return new OffsetOutOfRangeException(offsetOutOfRangePartitions, 
"Offsets out of range with no configured reset policy for partitions: " +
               offsetOutOfRangePartitions + ", root cause: " + reason);
     }
   }
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1304,6 +1310,19 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
         return completedFetch;
     }
 
+    private void handleOffsetOutOfRange(long fetchOffset,
+                                        TopicPartition topicPartition,
+                                        String reason) {
+        if (subscriptions.hasDefaultOffsetResetPolicy()) {
+            log.info("Fetch offset {} is out of range for partition {}, 
resetting offset",
+                topicPartition, fetchOffset);
+            subscriptions.requestOffsetReset(topicPartition);
+        } else {
+            throw new OffsetOutOfRangeException(Collections.singletonMap(

Review comment:
       Another improvement we can make here is to add the fetch offset to the 
exception message. Also, I'm considering if we should log this event even if we 
throw the exception back to the user. Otherwise, the user application might 
swallow it and we won't know it happened.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3831,7 +3845,12 @@ private void 
testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
         consumerClient.poll(time.timer(Duration.ZERO));
 
         assertEquals(0, subscriptions.position(tp0).offset);
-        assertFalse(subscriptions.awaitingValidation(tp0));
+
+        if (offsetResetStrategy == OffsetResetStrategy.NONE) {
+            assertTrue(subscriptions.awaitingValidation(tp0));
+        } else {
+            assertFalse(subscriptions.awaitingValidation(tp0));

Review comment:
       Hmm.. Can we assert the raised exception somehow? It's not clear to me 
that it is getting raised appropriately and we don't have any tests for it.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1304,6 +1310,19 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
         return completedFetch;
     }
 
+    private void handleOffsetOutOfRange(long fetchOffset,

Review comment:
       I think we can let this take `FetchPosition` instead of just the offset. 
Inside `initializeCompletedFetch`, we can pull the position from this line:
   ```java
                       if (fetchOffset != subscriptions.position(tp).offset) {
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -812,13 +813,22 @@ public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
                     // For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
                     // for the partition. If so, it means we have experienced 
log truncation and need to reposition
                     // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and 
epoch are valid. If not, then we should reset
+                    // its offset if reset policy is configured, or throw out 
of range exception.
                     offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = 
fetchPostitions.get(respTopicPartition);
-                        Optional<OffsetAndMetadata> divergentOffsetOpt = 
subscriptions.maybeCompleteValidation(
+                        SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
+
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            handleOffsetOutOfRange(requestPosition.offset, 
respTopicPartition,
+                                "Failed leader offset epoch validation for " + 
respEndOffset
+                                + " since no end offset larger than current 
fetch epoch was reported");
+                        } else {
+                            Optional<OffsetAndMetadata> divergentOffsetOpt = 
subscriptions.maybeCompleteValidation(
                                 respTopicPartition, requestPosition, 
respEndOffset);
-                        divergentOffsetOpt.ifPresent(divergentOffset -> {
-                            
truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset);
-                        });
+                            divergentOffsetOpt.ifPresent(
+                                divergentOffset -> 
truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset));
+                        }
                     });
 
                     if (!truncationWithoutResetPolicy.isEmpty()) {

Review comment:
       As below, should we log an event here to make sure we will have it in 
the logs even if the user discards it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to