mumrah commented on a change in pull request #8376:
URL: https://github.com/apache/kafka/pull/8376#discussion_r429325049



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -422,8 +427,29 @@ public synchronized void position(TopicPartition tp, 
FetchPosition position) {
         assignedState(tp).position(position);
     }
 
-    public synchronized boolean 
maybeValidatePositionForCurrentLeader(TopicPartition tp, 
Metadata.LeaderAndEpoch leaderAndEpoch) {
-        return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+    /**
+     * Enter the offset validation state if the leader for this partition is 
known to support a usable version of the
+     * OffsetsForLeaderEpoch API. If the leader node does not support the API, 
simply complete the offset validation.
+     *
+     * @param apiVersions
+     * @param tp
+     * @param leaderAndEpoch
+     * @return true if we enter the offset validation state
+     */
+    public synchronized boolean 
maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition 
tp,
+                                                                      
Metadata.LeaderAndEpoch leaderAndEpoch) {
+        if (leaderAndEpoch.leader.isPresent()) {
+            NodeApiVersions nodeApiVersions = 
apiVersions.get(leaderAndEpoch.leader.get().idString());
+            if (nodeApiVersions == null || 
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+                return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+            } else {
+                // If the broker does not support a newer version of 
OffsetsForLeaderEpoch, we skip validation
+                completeValidation(tp);
+                return false;
+            }
+        } else {
+            return assignedState(tp).maybeValidatePosition(leaderAndEpoch);

Review comment:
       Oh, actually looking at the javadoc for LeaderAndEpoch, I see
   
   > It is also possible that we know of the leader epoch, but not the leader 
when it is derived from an external source (e.g. a committed offset).
   
   Also in Metadata, we do return a LeaderAndEpoch with the last-seen epoch, 
but no leader if the metadata is stale. So, I guess it makes sense to keep this 
call in maybeValidatePositionForCurrentLeader
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to