hachikuji commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r434267922
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java ########## @@ -36,7 +36,8 @@ private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets; public LogTruncationException(Map<TopicPartition, OffsetAndMetadata> divergentOffsets) { - super(Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset)); + super(Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset), + "detected log truncation"); Review comment: I'd suggest a minor change to this. One of the issues here is that we don't get the divergent offsets in the message itself. I know that might not seem like a big deal, but sometimes the exception trace is all we get from the user. It's also surprising when an exception constructor takes a string parameter which is not the message itself. Can we do something like this instead? ```java class LogTruncationException { public LogTruncationException(Map<TopicPartition, OffsetAndMetadata> divergentOffsets) { super(Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset), "Detected log truncation with diverging offsets " + divergentOffsets); this.divergentOffsets = Collections.unmodifiableMap(divergentOffsets); } } class OffsetOutOfRangeException { public OffsetOutOfRangeException(Map<TopicPartition, Long> offsetOutOfRangePartitions) { this(offsetOutOfRangePartitions, "Offsets out of range with no configured reset policy for partitions: " + offsetOutOfRangePartitions); } public OffsetOutOfRangeException(String message, Map<TopicPartition, Long> offsetOutOfRangePartitions) { super(message); this.offsetOutOfRangePartitions = offsetOutOfRangePartitions; } } class Fetcher { private handleOffsetOutOfRange(..., String reason) { ... return new OffsetOutOfRangeException(offsetOutOfRangePartitions, "Offsets out of range with no configured reset policy for partitions: " + offsetOutOfRangePartitions + ", root cause: " + reason); } } ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -1304,6 +1310,19 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc return completedFetch; } + private void handleOffsetOutOfRange(long fetchOffset, + TopicPartition topicPartition, + String reason) { + if (subscriptions.hasDefaultOffsetResetPolicy()) { + log.info("Fetch offset {} is out of range for partition {}, resetting offset", + topicPartition, fetchOffset); + subscriptions.requestOffsetReset(topicPartition); + } else { + throw new OffsetOutOfRangeException(Collections.singletonMap( Review comment: Another improvement we can make here is to add the fetch offset to the exception message. Also, I'm considering if we should log this event even if we throw the exception back to the user. Otherwise, the user application might swallow it and we won't know it happened. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -3831,7 +3845,12 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch consumerClient.poll(time.timer(Duration.ZERO)); assertEquals(0, subscriptions.position(tp0).offset); - assertFalse(subscriptions.awaitingValidation(tp0)); + + if (offsetResetStrategy == OffsetResetStrategy.NONE) { + assertTrue(subscriptions.awaitingValidation(tp0)); + } else { + assertFalse(subscriptions.awaitingValidation(tp0)); Review comment: Hmm.. Can we assert the raised exception somehow? It's not clear to me that it is getting raised appropriately and we don't have any tests for it. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -1304,6 +1310,19 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc return completedFetch; } + private void handleOffsetOutOfRange(long fetchOffset, Review comment: I think we can let this take `FetchPosition` instead of just the offset. Inside `initializeCompletedFetch`, we can pull the position from this line: ```java if (fetchOffset != subscriptions.position(tp).offset) { ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -812,13 +813,22 @@ public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe // For each OffsetsForLeader response, check if the end-offset is lower than our current offset // for the partition. If so, it means we have experienced log truncation and need to reposition // that partition's offset. + // + // In addition, check whether the returned offset and epoch are valid. If not, then we should reset + // its offset if reset policy is configured, or throw out of range exception. offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> { - SubscriptionState.FetchPosition requestPosition = fetchPostitions.get(respTopicPartition); - Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation( + SubscriptionState.FetchPosition requestPosition = fetchPositions.get(respTopicPartition); + + if (respEndOffset.hasUndefinedEpochOrOffset()) { + handleOffsetOutOfRange(requestPosition.offset, respTopicPartition, + "Failed leader offset epoch validation for " + respEndOffset + + " since no end offset larger than current fetch epoch was reported"); + } else { + Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation( respTopicPartition, requestPosition, respEndOffset); - divergentOffsetOpt.ifPresent(divergentOffset -> { - truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset); - }); + divergentOffsetOpt.ifPresent( + divergentOffset -> truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset)); + } }); if (!truncationWithoutResetPolicy.isEmpty()) { Review comment: As below, should we log an event here to make sure we will have it in the logs even if the user discards it? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org