hachikuji commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r539700926



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
##########
@@ -143,30 +127,24 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
         return new OffsetsForLeaderEpochResponse(responseData);
     }
 
-    public static class PartitionData {
-        public final Optional<Integer> currentLeaderEpoch;
-        public final int leaderEpoch;
-
-        public PartitionData(Optional<Integer> currentLeaderEpoch, int 
leaderEpoch) {
-            this.currentLeaderEpoch = currentLeaderEpoch;
-            this.leaderEpoch = leaderEpoch;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch).
-                append(", leaderEpoch=").append(leaderEpoch).
-                append(")");
-            return bld.toString();
-        }
-    }
-
     /**
      * Check whether a broker allows Topic-level permissions in order to use 
the
      * OffsetForLeaderEpoch API. Old versions require Cluster permission.
      */
     public static boolean supportsTopicPermission(short latestUsableVersion) {
         return latestUsableVersion >= 3;
     }
+
+    /**
+     * Exposed `OffsetForLeaderPartition.currentLeaderEpoch` as an 
`java.util.Optional`.
+     *
+     * Classes auto-generated based on the protocol do not support 
`java.util.Optional` yet. This
+     * is a temporary workaround until that work is completed.
+     */
+    public static Optional<Integer> 
currentLeaderEpochOpt(OffsetForLeaderPartition offsetForLeaderPartition) {

Review comment:
       Could we use `RequestUtils.getLeaderEpoch`?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,11 +271,12 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
-            p =>
-              if (p.currentLeaderEpoch.isPresent) 
Some(p.currentLeaderEpoch.get())
-              else None
-          })) partitionsWithError += tp
+          val currentLeaderEpoch = latestEpochsForPartitions.get(tp) match {

Review comment:
       Maybe we can use `scala.compat.java8.OptionConverters._`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to