ijuma commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r584812552



##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -791,15 +791,18 @@ public void produceRequestGetErrorResponseTest() {
 
     @Test
     public void fetchResponseVersionTest() {
-        LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseData = new 
LinkedHashMap<>();
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
 
         MemoryRecords records = 
MemoryRecords.readableRecords(ByteBuffer.allocate(10));
-        responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData<>(
-                Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                0L, Optional.empty(), Collections.emptyList(), records));
-
-        FetchResponse<MemoryRecords> v0Response = new 
FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
-        FetchResponse<MemoryRecords> v1Response = new 
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
+        responseData.put(new TopicPartition("test", 0),
+                new FetchResponseData.PartitionData()
+                        .setHighWatermark(1000000)
+                        .setLogStartOffset(0)
+                        .setAbortedTransactions(Collections.emptyList())

Review comment:
       Aborted transactions is empty by default.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1146,38 +1159,47 @@ private FetchRequest createFetchRequest(int version) {
         return FetchRequest.Builder.forConsumer(100, 100000, 
fetchData).setMaxBytes(1000).build((short) version);
     }
 
-    private FetchResponse<MemoryRecords> createFetchResponse(Errors error, int 
sessionId) {
-        return new FetchResponse<>(error, new LinkedHashMap<>(), 25, 
sessionId);
+    private FetchResponse createFetchResponse(Errors error, int sessionId) {
+        return FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>());
     }
 
-    private FetchResponse<MemoryRecords> createFetchResponse(int sessionId) {
-        LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseData = new 
LinkedHashMap<>();
+    private FetchResponse createFetchResponse(int sessionId) {
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
         MemoryRecords records = 
MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("blah".getBytes()));
-        responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, 
Optional.empty(), Collections.emptyList(), records));
-        List<FetchResponse.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
-            new FetchResponse.AbortedTransaction(234L, 999L));
-        responseData.put(new TopicPartition("test", 1), new 
FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, 
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
-        return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
-    }
-
-    private FetchResponse<MemoryRecords> createFetchResponse(boolean 
includeAborted) {
-        LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseData = new 
LinkedHashMap<>();
+        responseData.put(new TopicPartition("test", 0), new 
FetchResponseData.PartitionData()
+                        .setHighWatermark(1000000)
+                        .setLogStartOffset(0)
+                        .setAbortedTransactions(Collections.emptyList())

Review comment:
       No need to set aborted transactions.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -416,9 +412,8 @@ abstract class AbstractFetcherThread(name: String,
                        "expected to persist.")
                   partitionsWithError += topicPartition
 
-                case _ =>
-                  error(s"Error for partition $topicPartition at offset 
${currentFetchState.fetchOffset}",
-                    partitionData.error.exception)
+                case partitionError: Errors =>

Review comment:
       We should not have `: Errors` here as it introduces a type test. What we 
want is for this to be a catch all.

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1144,8 +1143,14 @@ class AbstractFetcherThreadTest {
           (Errors.NONE, records)
         }
 
-        (partition, new FetchData(error, leaderState.highWatermark, 
leaderState.highWatermark, leaderState.logStartOffset,
-          Optional.empty[Integer], List.empty.asJava, divergingEpoch.asJava, 
records))
+        (partition, new FetchResponseData.PartitionData()
+          .setErrorCode(error.code)
+          .setHighWatermark(leaderState.highWatermark)
+          .setLastStableOffset(leaderState.highWatermark)
+          .setLogStartOffset(leaderState.logStartOffset)
+          .setAbortedTransactions(Collections.emptyList())

Review comment:
       Redundant.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -808,22 +811,32 @@ public void fetchResponseVersionTest() {
 
     @Test
     public void testFetchResponseV4() {
-        LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseData = new 
LinkedHashMap<>();
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
         MemoryRecords records = 
MemoryRecords.readableRecords(ByteBuffer.allocate(10));
 
-        List<FetchResponse.AbortedTransaction> abortedTransactions = asList(
-                new FetchResponse.AbortedTransaction(10, 100),
-                new FetchResponse.AbortedTransaction(15, 50)
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
asList(
+                new 
FetchResponseData.AbortedTransaction().setProducerId(10).setFirstOffset(100),
+                new 
FetchResponseData.AbortedTransaction().setProducerId(15).setFirstOffset(50)
         );
-        responseData.put(new TopicPartition("bar", 0), new 
FetchResponse.PartitionData<>(Errors.NONE, 100000,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), abortedTransactions, 
records));
-        responseData.put(new TopicPartition("bar", 1), new 
FetchResponse.PartitionData<>(Errors.NONE, 900000,
-                5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), 
null, records));
-        responseData.put(new TopicPartition("foo", 0), new 
FetchResponse.PartitionData<>(Errors.NONE, 70000,
-                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), 
emptyList(), records));
-
-        FetchResponse<MemoryRecords> response = new 
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
-        FetchResponse<MemoryRecords> deserialized = 
FetchResponse.parse(response.serialize((short) 4), (short) 4);
+        responseData.put(new TopicPartition("bar", 0),
+                new FetchResponseData.PartitionData()
+                        .setHighWatermark(1000000)
+                        .setAbortedTransactions(abortedTransactions)
+                        .setRecords(records));
+        responseData.put(new TopicPartition("bar", 1),
+                new FetchResponseData.PartitionData()

Review comment:
       Do we need to set the partition id here? There are a few other cases in 
this file that are similar.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1146,38 +1159,47 @@ private FetchRequest createFetchRequest(int version) {
         return FetchRequest.Builder.forConsumer(100, 100000, 
fetchData).setMaxBytes(1000).build((short) version);
     }
 
-    private FetchResponse<MemoryRecords> createFetchResponse(Errors error, int 
sessionId) {
-        return new FetchResponse<>(error, new LinkedHashMap<>(), 25, 
sessionId);
+    private FetchResponse createFetchResponse(Errors error, int sessionId) {
+        return FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>());
     }
 
-    private FetchResponse<MemoryRecords> createFetchResponse(int sessionId) {
-        LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseData = new 
LinkedHashMap<>();
+    private FetchResponse createFetchResponse(int sessionId) {
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
         MemoryRecords records = 
MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("blah".getBytes()));
-        responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, 
Optional.empty(), Collections.emptyList(), records));
-        List<FetchResponse.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
-            new FetchResponse.AbortedTransaction(234L, 999L));
-        responseData.put(new TopicPartition("test", 1), new 
FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, 
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
-        return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
-    }
-
-    private FetchResponse<MemoryRecords> createFetchResponse(boolean 
includeAborted) {
-        LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> responseData = new 
LinkedHashMap<>();
+        responseData.put(new TopicPartition("test", 0), new 
FetchResponseData.PartitionData()
+                        .setHighWatermark(1000000)
+                        .setLogStartOffset(0)
+                        .setAbortedTransactions(Collections.emptyList())
+                        .setRecords(records));
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
Collections.singletonList(
+            new 
FetchResponseData.AbortedTransaction().setProducerId(234L).setFirstOffset(999L));
+        responseData.put(new TopicPartition("test", 1), new 
FetchResponseData.PartitionData()
+                        .setHighWatermark(1000000)
+                        .setLogStartOffset(0)
+                        .setAbortedTransactions(abortedTransactions));
+        return FetchResponse.of(Errors.NONE, 25, sessionId, responseData);
+    }
+
+    private FetchResponse createFetchResponse(boolean includeAborted) {
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
         MemoryRecords records = 
MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("blah".getBytes()));
+        responseData.put(new TopicPartition("test", 0), new 
FetchResponseData.PartitionData()
+                        .setHighWatermark(1000000)
+                        .setLogStartOffset(0)
+                        .setAbortedTransactions(Collections.emptyList())

Review comment:
       Aborted transactions is empty by default.

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1144,8 +1143,14 @@ class AbstractFetcherThreadTest {
           (Errors.NONE, records)
         }
 
-        (partition, new FetchData(error, leaderState.highWatermark, 
leaderState.highWatermark, leaderState.logStartOffset,
-          Optional.empty[Integer], List.empty.asJava, divergingEpoch.asJava, 
records))
+        (partition, new FetchResponseData.PartitionData()
+          .setErrorCode(error.code)
+          .setHighWatermark(leaderState.highWatermark)
+          .setLastStableOffset(leaderState.highWatermark)
+          .setLogStartOffset(leaderState.logStartOffset)
+          .setAbortedTransactions(Collections.emptyList())
+          .setRecords(records)
+          .setDivergingEpoch(divergingEpoch.getOrElse(new 
FetchResponseData.EpochEndOffset)))

Review comment:
       The previous code did `asJava`, why did we change it?

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
##########
@@ -174,8 +173,12 @@ public int sizeInBytes() {
                     return null;
                 }
             };
-            initialFetched.put(tp, new 
FetchResponse.PartitionData<>(Errors.NONE, 0, 0, 0,
-                    new LinkedList<>(), fetched));
+            initialFetched.put(tp, new FetchResponseData.PartitionData()
+                    .setPartitionIndex(tp.partition())
+                    .setLastStableOffset(0)
+                    .setLogStartOffset(0)
+                    .setAbortedTransactions(Collections.emptyList())

Review comment:
       Redundant.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##########
@@ -963,9 +963,11 @@ class ReplicaFetcherThreadTest {
 
     val records = MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
-
-    val partitionData: thread.FetchData = new 
FetchResponse.PartitionData[Records](
-      Errors.NONE, 0, 0, 0, Optional.empty(), Collections.emptyList(), records)
+    val partitionData: thread.FetchData = new FetchResponseData.PartitionData()
+        .setLastStableOffset(0)
+        .setLogStartOffset(0)
+        .setAbortedTransactions(Collections.emptyList())

Review comment:
       Redundant.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##########
@@ -531,10 +529,12 @@ class ReplicaFetcherThreadTest {
     assertEquals(1, mockNetwork.fetchCount)
     partitions.foreach { tp => assertEquals(Fetching, 
thread.fetchState(tp).get.state) }
 
-    def partitionData(divergingEpoch: FetchResponseData.EpochEndOffset): 
FetchResponse.PartitionData[Records] = {
-      new FetchResponse.PartitionData[Records](
-        Errors.NONE, 0, 0, 0, Optional.empty(), Collections.emptyList(),
-        Optional.of(divergingEpoch), MemoryRecords.EMPTY)
+    def partitionData(divergingEpoch: FetchResponseData.EpochEndOffset): 
FetchResponseData.PartitionData = {
+      new FetchResponseData.PartitionData()
+          .setLastStableOffset(0)
+          .setLogStartOffset(0)
+          .setAbortedTransactions(Collections.emptyList())

Review comment:
       Redundant.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -658,12 +732,19 @@ class FetchSessionTest {
     // Full fetch context returns all partitions in the response
     val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData, 
EMPTY_PART_LIST, isFollower = false)
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
-    respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
-      105, 105, 0, Optional.empty(), Collections.emptyList(), 
Optional.empty(), null))
-    val divergingEpoch = Optional.of(new 
FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(90))
-    respData.put(tp2, new FetchResponse.PartitionData(Errors.NONE,
-      105, 105, 0, Optional.empty(), Collections.emptyList(), divergingEpoch, 
null))
+    val respData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    respData.put(tp1, new FetchResponseData.PartitionData()
+        .setHighWatermark(105)
+        .setLastStableOffset(105)
+        .setLogStartOffset(0)
+        .setAbortedTransactions(Collections.emptyList()))
+    val divergingEpoch = new 
FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(90)
+    respData.put(tp2, new FetchResponseData.PartitionData()
+        .setHighWatermark(105)
+        .setLastStableOffset(105)
+        .setLogStartOffset(0)
+        .setAbortedTransactions(Collections.emptyList())

Review comment:
       Redundant.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -679,17 +760,24 @@ class FetchSessionTest {
     assertEquals(Collections.singleton(tp2), resp2.responseData.keySet)
 
     // All partitions with divergent epoch should be returned.
-    respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
-      105, 105, 0, Optional.empty(), Collections.emptyList(), divergingEpoch, 
null))
+    respData.put(tp1, new FetchResponseData.PartitionData()
+          .setHighWatermark(105)
+          .setLastStableOffset(105)
+          .setLogStartOffset(0)
+          .setAbortedTransactions(Collections.emptyList())

Review comment:
       Redundant.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -679,17 +760,24 @@ class FetchSessionTest {
     assertEquals(Collections.singleton(tp2), resp2.responseData.keySet)
 
     // All partitions with divergent epoch should be returned.
-    respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
-      105, 105, 0, Optional.empty(), Collections.emptyList(), divergingEpoch, 
null))
+    respData.put(tp1, new FetchResponseData.PartitionData()
+          .setHighWatermark(105)
+          .setLastStableOffset(105)
+          .setLogStartOffset(0)
+          .setAbortedTransactions(Collections.emptyList())
+          .setDivergingEpoch(divergingEpoch))
     val resp3 = context2.updateAndGenerateResponseData(respData)
     assertEquals(Errors.NONE, resp3.error)
     assertEquals(resp1.sessionId, resp3.sessionId)
     assertEquals(Utils.mkSet(tp1, tp2), resp3.responseData.keySet)
 
     // Partitions that meet other conditions should be returned regardless of 
whether
     // divergingEpoch is set or not.
-    respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
-      110, 110, 0, Optional.empty(), Collections.emptyList(), 
Optional.empty(), null))
+    respData.put(tp1, new FetchResponseData.PartitionData()
+        .setHighWatermark(110)
+        .setLastStableOffset(110)
+        .setLogStartOffset(0)
+        .setAbortedTransactions(Collections.emptyList()))

Review comment:
       Redundant.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +126,92 @@ public int sessionId() {
      * @param partIterator  The partition iterator.
      * @return              The response size in bytes.
      */
-    public static <T extends BaseRecords> int sizeOf(short version,
-                                                     
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+    public static int sizeOf(short version,
+                             Iterator<Map.Entry<TopicPartition, 
FetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data = 
new LinkedHashMap<>();
+        partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
         ObjectSerializationCache cache = new ObjectSerializationCache();
-        return 4 + data.size(cache, version);
+        return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
     }
 
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 8;
     }
-}
+
+    public static Optional<FetchResponseData.EpochEndOffset> 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+                : Optional.of(partitionResponse.divergingEpoch());
+    }
+
+    public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() >= 0;
+    }
+
+    public static Optional<Integer> 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+                : Optional.of(partitionResponse.preferredReadReplica());
+    }
+
+    public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+    }
+
+    public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+        return new FetchResponseData.PartitionData()
+            .setPartitionIndex(partition)
+            .setErrorCode(error.code())
+            .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+    }
+
+    /**
+     * cast the BaseRecords of PartitionData to Records. KRPC converts the 
byte array to MemoryRecords so this method
+     * never fail if the data is from KRPC.

Review comment:
       Suggestion:
   
   ```text
   Returns `partition.records` as `Records` (instead of `BaseRecords`). If 
`records` is `null`, returns `MemoryRecords.EMPTY`.
   
   If this response was deserialized after a fetch, this method should never 
fail. An example where this would fail is a down-converted response (e.g. 
LazyDownConversionRecords) on the broker (before it's serialized and sent on 
the wire).
   ```
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +126,92 @@ public int sessionId() {
      * @param partIterator  The partition iterator.
      * @return              The response size in bytes.
      */
-    public static <T extends BaseRecords> int sizeOf(short version,
-                                                     
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+    public static int sizeOf(short version,
+                             Iterator<Map.Entry<TopicPartition, 
FetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data = 
new LinkedHashMap<>();
+        partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
         ObjectSerializationCache cache = new ObjectSerializationCache();
-        return 4 + data.size(cache, version);
+        return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
     }
 
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 8;
     }
-}
+
+    public static Optional<FetchResponseData.EpochEndOffset> 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+                : Optional.of(partitionResponse.divergingEpoch());
+    }
+
+    public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() >= 0;
+    }
+
+    public static Optional<Integer> 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+                : Optional.of(partitionResponse.preferredReadReplica());
+    }
+
+    public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+    }
+
+    public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+        return new FetchResponseData.PartitionData()
+            .setPartitionIndex(partition)
+            .setErrorCode(error.code())
+            .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+    }
+
+    /**
+     * cast the BaseRecords of PartitionData to Records. KRPC converts the 
byte array to MemoryRecords so this method
+     * never fail if the data is from KRPC.
+     *
+     * @param partition partition data
+     * @return Records or empty record if the records in PartitionData is null.
+     */
+    public static Records records(FetchResponseData.PartitionData partition) {
+        return partition.records() == null ? MemoryRecords.EMPTY : (Records) 
partition.records();

Review comment:
       Instead of casting blindly, can we include a reasonable error message if 
the cast fails?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,79 +754,84 @@ class KafkaApis(val requestChannel: RequestChannel,
             // For fetch requests from clients, check if down-conversion is 
disabled for the particular partition
             if (!fetchRequest.isFromFollower && 
!logConfig.forall(_.messageDownConversionEnable)) {
               trace(s"Conversion to message format ${downConvertMagic.get} is 
disabled for partition $tp. Sending unsupported version response to $clientId.")
-              errorResponse(Errors.UNSUPPORTED_VERSION)
+              FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_VERSION)
             } else {
               try {
                 trace(s"Down converting records from partition $tp to message 
format version $magic for fetch request from $clientId")
                 // Because down-conversion is extremely memory intensive, we 
want to try and delay the down-conversion as much
                 // as possible. With KIP-283, we have the ability to lazily 
down-convert in a chunked manner. The lazy, chunked
                 // down-conversion always guarantees that at least one batch 
of messages is down-converted and sent out to the
                 // client.
-                val error = maybeDownConvertStorageError(partitionData.error)
-                new FetchResponse.PartitionData[BaseRecords](error, 
partitionData.highWatermark,
-                  partitionData.lastStableOffset, partitionData.logStartOffset,
-                  partitionData.preferredReadReplica, 
partitionData.abortedTransactions,
-                  new LazyDownConversionRecords(tp, unconvertedRecords, magic, 
fetchContext.getFetchOffset(tp).get, time))
+                new FetchResponseData.PartitionData()
+                  .setPartitionIndex(tp.partition)
+                  
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+                  .setHighWatermark(partitionData.highWatermark)
+                  .setLastStableOffset(partitionData.lastStableOffset)
+                  .setLogStartOffset(partitionData.logStartOffset)
+                  .setAbortedTransactions(partitionData.abortedTransactions)
+                  .setRecords(new LazyDownConversionRecords(tp, 
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+                  
.setPreferredReadReplica(partitionData.preferredReadReplica())
               } catch {
                 case e: UnsupportedCompressionTypeException =>
                   trace("Received unsupported compression type error during 
down-conversion", e)
-                  errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+                  FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_COMPRESSION_TYPE)
               }
             }
           case None =>
-            val error = maybeDownConvertStorageError(partitionData.error)
-            new FetchResponse.PartitionData[BaseRecords](error,
-              partitionData.highWatermark,
-              partitionData.lastStableOffset,
-              partitionData.logStartOffset,
-              partitionData.preferredReadReplica,
-              partitionData.abortedTransactions,
-              partitionData.divergingEpoch,
-              unconvertedRecords)
+            new FetchResponseData.PartitionData()
+              .setPartitionIndex(tp.partition)
+              
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+              .setHighWatermark(partitionData.highWatermark)
+              .setLastStableOffset(partitionData.lastStableOffset)
+              .setLogStartOffset(partitionData.logStartOffset)
+              .setAbortedTransactions(partitionData.abortedTransactions)
+              .setRecords(unconvertedRecords)
+              .setPreferredReadReplica(partitionData.preferredReadReplica)
+              .setDivergingEpoch(partitionData.divergingEpoch)
         }
       }
     }
 
     // the callback for process a fetch response, invoked before throttling
     def processResponseCallback(responsePartitionData: Seq[(TopicPartition, 
FetchPartitionData)]): Unit = {
-      val partitions = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
+      val partitions = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
       val reassigningPartitions = mutable.Set[TopicPartition]()
       responsePartitionData.foreach { case (tp, data) =>
         val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
         val lastStableOffset = 
data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
-        if (data.isReassignmentFetch)
-          reassigningPartitions.add(tp)
-        val error = maybeDownConvertStorageError(data.error)
-        partitions.put(tp, new FetchResponse.PartitionData(
-          error,
-          data.highWatermark,
-          lastStableOffset,
-          data.logStartOffset,
-          data.preferredReadReplica.map(int2Integer).asJava,
-          abortedTransactions,
-          data.divergingEpoch.asJava,
-          data.records))
+        if (data.isReassignmentFetch) reassigningPartitions.add(tp)
+        partitions.put(tp, new FetchResponseData.PartitionData()
+            .setPartitionIndex(tp.partition)
+            .setErrorCode(maybeDownConvertStorageError(data.error).code)
+            .setHighWatermark(data.highWatermark)
+            .setLastStableOffset(lastStableOffset)
+            .setLogStartOffset(data.logStartOffset)
+            .setAbortedTransactions(abortedTransactions)
+            .setRecords(data.records)
+            
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
+            .setDivergingEpoch(data.divergingEpoch.getOrElse(new 
FetchResponseData.EpochEndOffset)))

Review comment:
       Seems that we could set the diverging offset only if set and leave the 
default otherwise.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +126,92 @@ public int sessionId() {
      * @param partIterator  The partition iterator.
      * @return              The response size in bytes.
      */
-    public static <T extends BaseRecords> int sizeOf(short version,
-                                                     
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+    public static int sizeOf(short version,
+                             Iterator<Map.Entry<TopicPartition, 
FetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data = 
new LinkedHashMap<>();
+        partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
         ObjectSerializationCache cache = new ObjectSerializationCache();
-        return 4 + data.size(cache, version);
+        return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
     }
 
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 8;
     }
-}
+
+    public static Optional<FetchResponseData.EpochEndOffset> 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+                : Optional.of(partitionResponse.divergingEpoch());
+    }
+
+    public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() >= 0;
+    }
+
+    public static Optional<Integer> 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+                : Optional.of(partitionResponse.preferredReadReplica());
+    }
+
+    public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+    }
+
+    public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+        return new FetchResponseData.PartitionData()
+            .setPartitionIndex(partition)
+            .setErrorCode(error.code())
+            .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+    }
+
+    /**
+     * cast the BaseRecords of PartitionData to Records. KRPC converts the 
byte array to MemoryRecords so this method
+     * never fail if the data is from KRPC.
+     *
+     * @param partition partition data
+     * @return Records or empty record if the records in PartitionData is null.
+     */
+    public static Records records(FetchResponseData.PartitionData partition) {

Review comment:
       There is one place in this PR that we check for null when computing the 
records size, maybe we can use this utility function there.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -658,12 +732,19 @@ class FetchSessionTest {
     // Full fetch context returns all partitions in the response
     val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData, 
EMPTY_PART_LIST, isFollower = false)
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
-    respData.put(tp1, new FetchResponse.PartitionData(Errors.NONE,
-      105, 105, 0, Optional.empty(), Collections.emptyList(), 
Optional.empty(), null))
-    val divergingEpoch = Optional.of(new 
FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(90))
-    respData.put(tp2, new FetchResponse.PartitionData(Errors.NONE,
-      105, 105, 0, Optional.empty(), Collections.emptyList(), divergingEpoch, 
null))
+    val respData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    respData.put(tp1, new FetchResponseData.PartitionData()
+        .setHighWatermark(105)
+        .setLastStableOffset(105)
+        .setLogStartOffset(0)
+        .setAbortedTransactions(Collections.emptyList()))

Review comment:
       Redundant.

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
##########
@@ -78,19 +78,23 @@ public void setup() {
         for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
             String topic = UUID.randomUUID().toString();
             for (int partitionId = 0; partitionId < partitionCount; 
partitionId++) {
-                FetchResponse.PartitionData<MemoryRecords> partitionData = new 
FetchResponse.PartitionData<>(
-                    Errors.NONE, 0, 0, 0, Optional.empty(), 
Collections.emptyList(), records);
+                FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                                .setPartitionIndex(partitionId)
+                                .setLastStableOffset(0)
+                                .setLogStartOffset(0)
+                                
.setAbortedTransactions(Collections.emptyList())

Review comment:
       Redundant.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -534,15 +588,21 @@ class FetchSessionTest {
       Optional.empty()))
     val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, 
session1req, EMPTY_PART_LIST, false)
     assertEquals(classOf[FullFetchContext], session2context.getClass)
-    val session2RespData = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
-    session2RespData.put(new TopicPartition("foo", 0), new 
FetchResponse.PartitionData(
-      Errors.NONE, 100, 100, 100, null, null))
-    session2RespData.put(new TopicPartition("foo", 1), new 
FetchResponse.PartitionData(
-      Errors.NONE, 10, 10, 10, null, null))
+    val session2RespData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    session2RespData.put(new TopicPartition("foo", 0),
+      new FetchResponseData.PartitionData()
+        .setHighWatermark(100)
+        .setLastStableOffset(100)
+        .setLogStartOffset(100))
+    session2RespData.put(new TopicPartition("foo", 1),
+      new FetchResponseData.PartitionData()
+        .setHighWatermark(10)

Review comment:
       Do we need to set the partition id here and other cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to