hachikuji commented on a change in pull request #9547:
URL: https://github.com/apache/kafka/pull/9547#discussion_r526287129



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
##########
@@ -51,169 +47,120 @@
      */
     public static final int DEBUGGING_REPLICA_ID = -2;
 
-    private static final Field.ComplexArray TOPICS = new 
Field.ComplexArray("topics",
-            "An array of topics to get epochs for");
-    private static final Field.ComplexArray PARTITIONS = new 
Field.ComplexArray("partitions",
-            "An array of partitions to get epochs for");
-
-    private static final Field.Int32 LEADER_EPOCH = new 
Field.Int32("leader_epoch",
-            "The epoch to lookup an offset for.");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            LEADER_EPOCH);
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new 
Schema(
-            TOPICS_V0);
-
-    // V1 request is the same as v0. Per-partition leader epoch has been added 
to response
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V1 = 
OFFSET_FOR_LEADER_EPOCH_REQUEST_V0;
-
-    // V2 adds the current leader epoch to support fencing and the addition of 
the throttle time in the response
-    private static final Field PARTITIONS_V2 = PARTITIONS.withFields(
-            PARTITION_ID,
-            CURRENT_LEADER_EPOCH,
-            LEADER_EPOCH);
-    private static final Field TOPICS_V2 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V2);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V2 = new 
Schema(
-            TOPICS_V2);
-
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V3 = new 
Schema(
-            REPLICA_ID,
-            TOPICS_V2);
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0, 
OFFSET_FOR_LEADER_EPOCH_REQUEST_V1,
-            OFFSET_FOR_LEADER_EPOCH_REQUEST_V2, 
OFFSET_FOR_LEADER_EPOCH_REQUEST_V3};
-    }
-
-    private final Map<TopicPartition, PartitionData> epochsByPartition;
-
-    private final int replicaId;
-
-    public Map<TopicPartition, PartitionData> epochsByTopicPartition() {
-        return epochsByPartition;
-    }
-
-    public int replicaId() {
-        return replicaId;
-    }
+    private final OffsetForLeaderEpochRequestData data;
 
     public static class Builder extends 
AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
-        private final Map<TopicPartition, PartitionData> epochsByPartition;
-        private final int replicaId;
+        private final OffsetForLeaderEpochRequestData data;
 
-        Builder(short oldestAllowedVersion, short latestAllowedVersion, 
Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
+        Builder(short oldestAllowedVersion, short latestAllowedVersion, 
OffsetForLeaderEpochRequestData data) {
             super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, oldestAllowedVersion, 
latestAllowedVersion);
-            this.epochsByPartition = epochsByPartition;
-            this.replicaId = replicaId;
+            this.data = data;
         }
 
-        public static Builder forConsumer(Map<TopicPartition, PartitionData> 
epochsByPartition) {
+        public static Builder forConsumer(OffsetForLeaderTopicCollection 
epochsByPartition) {
             // Old versions of this API require CLUSTER permission which is 
not typically granted
             // to clients. Beginning with version 3, the broker requires only 
TOPIC Describe
             // permission for the topic of each requested partition. In order 
to ensure client
             // compatibility, we only send this request when we can guarantee 
the relaxed permissions.
-            return new Builder((short) 3, 
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
-                    epochsByPartition, CONSUMER_REPLICA_ID);
+            OffsetForLeaderEpochRequestData data = new 
OffsetForLeaderEpochRequestData();
+            data.setReplicaId(CONSUMER_REPLICA_ID);
+            data.setTopics(epochsByPartition);
+            return new Builder((short) 3, 
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
         }
 
         public static Builder forFollower(short version, Map<TopicPartition, 
PartitionData> epochsByPartition, int replicaId) {
-            return new Builder(version, version, epochsByPartition, replicaId);
+            OffsetForLeaderEpochRequestData data = new 
OffsetForLeaderEpochRequestData();
+            data.setReplicaId(replicaId);
+
+            epochsByPartition.forEach((partitionKey, partitionValue) -> {
+                OffsetForLeaderTopic topic = 
data.topics().find(partitionKey.topic());
+                if (topic == null) {

Review comment:
       Sounds fine. Do you have a jira?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to