chia7712 commented on code in PR #16885: URL: https://github.com/apache/kafka/pull/16885#discussion_r1758093606
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -218,18 +490,17 @@ public CompletableFuture<Void> resetPositionsIfNeeded() { * * <p/> * - * When a response is received, positions are validated and, if a log truncation is - * detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the + * When a response is received, positions are validated and, if a log truncation is detected, a + * {@link LogTruncationException} will be saved in memory in cachedUpdatePositionsException, to be thrown on the * next call to this function. */ - public CompletableFuture<Void> validatePositionsIfNeeded() { - Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = - offsetFetcherUtils.getPartitionsToValidate(); + void validatePositionsIfNeeded() { + Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate(); if (partitionsToValidate.isEmpty()) { - return CompletableFuture.completedFuture(null); + return; } - return sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); + sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); Review Comment: The return value of `sendOffsetsForLeaderEpochRequestsAndValidatePositions` is useless now. Could you please do a bit cleanup for it? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -218,18 +490,17 @@ public CompletableFuture<Void> resetPositionsIfNeeded() { * * <p/> * - * When a response is received, positions are validated and, if a log truncation is - * detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the + * When a response is received, positions are validated and, if a log truncation is detected, a + * {@link LogTruncationException} will be saved in memory in cachedUpdatePositionsException, to be thrown on the * next call to this function. */ - public CompletableFuture<Void> validatePositionsIfNeeded() { - Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = - offsetFetcherUtils.getPartitionsToValidate(); + void validatePositionsIfNeeded() { + Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate(); if (partitionsToValidate.isEmpty()) { Review Comment: How to make sure all partitions get validated already? the `partitionsToValidate` could be empty if there are in-flight `OffsetsForLeaderEpochRequest`, right? https://github.com/apache/kafka/blob/b436499557e4d9de89a01818117d6c29d8190a49/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java#L202 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -218,18 +490,17 @@ public CompletableFuture<Void> resetPositionsIfNeeded() { * * <p/> * - * When a response is received, positions are validated and, if a log truncation is - * detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the + * When a response is received, positions are validated and, if a log truncation is detected, a + * {@link LogTruncationException} will be saved in memory in cachedUpdatePositionsException, to be thrown on the * next call to this function. */ - public CompletableFuture<Void> validatePositionsIfNeeded() { - Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = - offsetFetcherUtils.getPartitionsToValidate(); + void validatePositionsIfNeeded() { + Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate(); Review Comment: `offsetFetcherUtils.getPartitionsToValidate` can throw exception from cache, so should we propagate the exception by `CompletableFuture`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org