mumrah commented on a change in pull request #8376:
URL: https://github.com/apache/kafka/pull/8376#discussion_r429323395



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -422,8 +427,29 @@ public synchronized void position(TopicPartition tp, 
FetchPosition position) {
         assignedState(tp).position(position);
     }
 
-    public synchronized boolean 
maybeValidatePositionForCurrentLeader(TopicPartition tp, 
Metadata.LeaderAndEpoch leaderAndEpoch) {
-        return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+    /**
+     * Enter the offset validation state if the leader for this partition is 
known to support a usable version of the
+     * OffsetsForLeaderEpoch API. If the leader node does not support the API, 
simply complete the offset validation.
+     *
+     * @param apiVersions
+     * @param tp
+     * @param leaderAndEpoch
+     * @return true if we enter the offset validation state
+     */
+    public synchronized boolean 
maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition 
tp,
+                                                                      
Metadata.LeaderAndEpoch leaderAndEpoch) {
+        if (leaderAndEpoch.leader.isPresent()) {
+            NodeApiVersions nodeApiVersions = 
apiVersions.get(leaderAndEpoch.leader.get().idString());
+            if (nodeApiVersions == null || 
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+                return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+            } else {
+                // If the broker does not support a newer version of 
OffsetsForLeaderEpoch, we skip validation
+                completeValidation(tp);
+                return false;
+            }
+        } else {
+            return assignedState(tp).maybeValidatePosition(leaderAndEpoch);

Review comment:
       I wonder, do we really need this call here? If the leader is not present 
the epoch shouldn't be present either -- right? If that's the case, then the 
call to maybeValidatePosition will short circuit
   
   ```java
           private boolean maybeValidatePosition(Metadata.LeaderAndEpoch 
currentLeaderAndEpoch) {
               if (this.fetchState.equals(FetchStates.AWAIT_RESET)) {
                   return false;
               }
   
               if (!currentLeaderAndEpoch.leader.isPresent() && 
!currentLeaderAndEpoch.epoch.isPresent()) {
                   return false;
               }
   
               if (position != null && 
!position.currentLeader.equals(currentLeaderAndEpoch)) {
                   FetchPosition newPosition = new 
FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);
                   validatePosition(newPosition);
                   preferredReadReplica = null;
               }
               return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to