guozhangwang commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r413417021



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
         validateOffsetsAsync(partitionsToValidate);
     }
 
+    /**
+     * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+     * with the epoch less than or equal to the epoch the partition last saw.
+     *
+     * Requests are grouped by Node for efficiency.
+     */
+    private void validateOffsetsAsync(Map<TopicPartition, 
SubscriptionState.FetchPosition> partitionsToValidate) {
+        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> 
regrouped =
+            regroupFetchPositionsByLeader(partitionsToValidate);
+
+        regrouped.forEach((node, fetchPositions) -> {
+            if (node.isEmpty()) {
+                metadata.requestUpdate();
+                return;
+            }
+
+            NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+            if (nodeApiVersions == null) {
+                client.tryConnect(node);
+                return;
+            }
+
+            if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+                log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+                              "support the required protocol version 
(introduced in Kafka 2.3)",
+                    fetchPositions.keySet());
+                completeAllValidations(fetchPositions);
+                return;
+            }
+
+            // We need to get the client epoch state before sending out the 
leader epoch request, and use it to
+            // decide whether we need to validate offsets.
+            if (!metadata.hasReliableLeaderEpochs()) {
+                log.debug("Skipping validation of fetch offsets for partitions 
{} since the provided leader broker " +
+                              "is not reliable", fetchPositions.keySet());
+                completeAllValidations(fetchPositions);
+                return;
+            }
+
+            subscriptions.setNextAllowedRetry(fetchPositions.keySet(), 
time.milliseconds() + requestTimeoutMs);
+
+            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> 
future =
+                offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPositions);
+
+            future.addListener(new 
RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
+                @Override
+                public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
+                    Map<TopicPartition, OffsetAndMetadata> 
truncationWithoutResetPolicy = new HashMap<>();
+                    if (!offsetsResult.partitionsToRetry().isEmpty()) {
+                        
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
+                        metadata.requestUpdate();
+                    }
+
+                    // For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
+                    // for the partition. If so, it means we have experienced 
log truncation and need to reposition
+                    // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and 
epoch are valid. If not, then we should treat
+                    // it as out of range and update metadata for rediscovery.
+                    offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            // Should attempt to find the new leader in the 
next try.
+                            log.debug("Requesting metadata update for 
partition {} due to undefined epoch or offset {}",

Review comment:
       nit: `... or offset {} from OffsetsForLeaderEpoch response`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
##########
@@ -86,4 +84,9 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(error, leaderEpoch, endOffset);
     }
+
+    public boolean hasUndefinedEpochOrOffset() {
+        return this.endOffset == UNDEFINED_EPOCH_OFFSET ||

Review comment:
       For my own understanding: if endOffset is UNDEFINED the epoch should 
always be UNDEFINED too? If that's the case we can just rely on `leaderEpoch` 
alone?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
         validateOffsetsAsync(partitionsToValidate);
     }
 
+    /**
+     * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+     * with the epoch less than or equal to the epoch the partition last saw.
+     *
+     * Requests are grouped by Node for efficiency.
+     */
+    private void validateOffsetsAsync(Map<TopicPartition, 
SubscriptionState.FetchPosition> partitionsToValidate) {
+        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> 
regrouped =
+            regroupFetchPositionsByLeader(partitionsToValidate);
+
+        regrouped.forEach((node, fetchPositions) -> {
+            if (node.isEmpty()) {
+                metadata.requestUpdate();
+                return;
+            }
+
+            NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+            if (nodeApiVersions == null) {
+                client.tryConnect(node);
+                return;
+            }
+
+            if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+                log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+                              "support the required protocol version 
(introduced in Kafka 2.3)",
+                    fetchPositions.keySet());
+                completeAllValidations(fetchPositions);
+                return;
+            }
+
+            // We need to get the client epoch state before sending out the 
leader epoch request, and use it to
+            // decide whether we need to validate offsets.
+            if (!metadata.hasReliableLeaderEpochs()) {

Review comment:
       Could you not move the function while changing it so that the diff is 
easier to check -- e.g. here I'd have to blindly trust you that there are only 
changes of this func :)

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
##########
@@ -85,10 +85,6 @@ protected OffsetForEpochResult handleResponse(
                 case KAFKA_STORAGE_ERROR:
                 case OFFSET_NOT_AVAILABLE:
                 case LEADER_NOT_AVAILABLE:
-                    logger().debug("Attempt to fetch offsets for partition {} 
failed due to {}, retrying.",

Review comment:
       Why we can remove this logic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to