jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995053



##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
##########
@@ -163,48 +85,46 @@ public OptionalLong lastOffset() {
 
     @Override
     public void close() {
-        isClosed = true;
+        if (!isClosed) {
+            isClosed = true;
 
-        if (allocatedBuffer != null) {
-            bufferSupplier.release(allocatedBuffer);
+            iterator.close();
+            closeListener.onClose(this);
         }
-
-        closeListener.onClose(this);
     }
 
-    public T readRecord(Readable input) {
-        // Read size of body in bytes
-        input.readVarint();
-
-        // Read unused attributes
-        input.readByte();
-
-        long timestampDelta = input.readVarlong();
-        if (timestampDelta != 0) {
-            throw new IllegalArgumentException();
-        }
-
-        // Read offset delta
-        input.readVarint();
-
-        int keySize = input.readVarint();
-        if (keySize != -1) {
-            throw new IllegalArgumentException("Unexpected key size " + 
keySize);
-        }
+    public static <T> RecordsBatchReader<T> of(
+        long baseOffset,
+        Records records,
+        RecordSerde<T> serde,
+        BufferSupplier bufferSupplier,
+        int maxBatchSize,
+        CloseListener<BatchReader<T>> closeListener
+    ) {
+        return new RecordsBatchReader<>(
+            baseOffset,
+            new SerdeRecordsIterator<>(records, serde, bufferSupplier, 
maxBatchSize),
+            closeListener
+        );
+    }
 
-        int valueSize = input.readVarint();
-        if (valueSize < 0) {
-            throw new IllegalArgumentException();
+    private void checkIfClosed() {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to