chia7712 commented on a change in pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#discussion_r558430765



##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -201,38 +218,37 @@ public void testHasRoomForMethod(Args args) {
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @ArgumentsSource(MagicAtLeastV2.class)
     public void testHasRoomForMethodWithHeaders(Args args) {
         byte magic = args.magic;
-        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
-            MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(100), magic, args.compression,
-                    TimestampType.CREATE_TIME, 0L);
-            RecordHeaders headers = new RecordHeaders();
-            headers.add("hello", "world.world".getBytes());
-            headers.add("hello", "world.world".getBytes());
-            headers.add("hello", "world.world".getBytes());
-            headers.add("hello", "world.world".getBytes());
-            headers.add("hello", "world.world".getBytes());
-            builder.append(logAppendTime, "key".getBytes(), 
"value".getBytes());
-            // Make sure that hasRoomFor accounts for header sizes by letting 
a record without headers pass, but stopping
-            // a record with a large number of headers.
-            assertTrue(builder.hasRoomFor(logAppendTime, "key".getBytes(), 
"value".getBytes(), Record.EMPTY_HEADERS));
-            assertFalse(builder.hasRoomFor(logAppendTime, "key".getBytes(), 
"value".getBytes(), headers.toArray()));
-        }
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(100), magic, args.compression,
+                TimestampType.CREATE_TIME, 0L);
+        RecordHeaders headers = new RecordHeaders();
+        headers.add("hello", "world.world".getBytes());
+        headers.add("hello", "world.world".getBytes());
+        headers.add("hello", "world.world".getBytes());
+        headers.add("hello", "world.world".getBytes());
+        headers.add("hello", "world.world".getBytes());
+        builder.append(logAppendTime, "key".getBytes(), "value".getBytes());
+        // Make sure that hasRoomFor accounts for header sizes by letting a 
record without headers pass, but stopping
+        // a record with a large number of headers.
+        assertTrue(builder.hasRoomFor(logAppendTime, "key".getBytes(), 
"value".getBytes(), Record.EMPTY_HEADERS));
+        assertFalse(builder.hasRoomFor(logAppendTime, "key".getBytes(), 
"value".getBytes(), headers.toArray()));
+    }
+
+    private static Stream<Arguments> testChecksum() {
+        return getMemoryRecordsArguments((magic, type) -> type == 
CompressionType.NONE || type == CompressionType.LZ4);
     }
 
     /**
      * This test verifies that the checksum returned for various versions 
matches hardcoded values to catch unintentional
      * changes to how the checksum is computed.
      */
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @MethodSource

Review comment:
       I prefer to add explicit parameter name. Otherwise, the method offering 
parameters looks like a unused method.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -81,26 +82,43 @@ public String toString() {
         }
     }
 
+    private static Stream<Arguments> 
getMemoryRecordsArguments(BiPredicate<Byte, CompressionType> accept) {

Review comment:
       maybe just me. ```getMemoryRecordsArguments``` is too verbose. How about 
```allArguments```?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -81,26 +82,43 @@ public String toString() {
         }
     }
 
+    private static Stream<Arguments> 
getMemoryRecordsArguments(BiPredicate<Byte, CompressionType> accept) {
+        List<Arguments> arguments = new ArrayList<>();
+        for (long firstOffset : asList(0L, 57L))
+            for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2))
+                for (CompressionType type : CompressionType.values())
+                    if (accept.test(magic, type))
+                        arguments.add(Arguments.of(new Args(magic, 
firstOffset, type)));
+        return arguments.stream();
+    }
+
     private static class MemoryRecordsArgumentsProvider implements 
ArgumentsProvider {
         @Override
         public Stream<? extends Arguments> provideArguments(ExtensionContext 
context) {
-            List<Arguments> arguments = new ArrayList<>();
-            for (long firstOffset : asList(0L, 57L))
-                for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2))
-                    for (CompressionType type: CompressionType.values())
-                        arguments.add(Arguments.of(new Args(magic, 
firstOffset, type)));
-            return arguments.stream();
+            return getMemoryRecordsArguments((magic, type) -> true);
+        }
+    }
+
+    private static class MagicAtLeastV2 implements ArgumentsProvider {
+        @Override
+        public Stream<? extends Arguments> provideArguments(ExtensionContext 
context) {
+            return getMemoryRecordsArguments((magic, type) -> magic >= 
RecordBatch.MAGIC_VALUE_V2);
+        }
+    }
+
+    private static class AtLeastV2OrNotZstd implements ArgumentsProvider {
+        @Override
+        public Stream<? extends Arguments> provideArguments(ExtensionContext 
context) {
+            return getMemoryRecordsArguments((magic, type) -> type != 
CompressionType.ZSTD || magic >= MAGIC_VALUE_V2);
         }
     }
 
     private final long logAppendTime = System.currentTimeMillis();
     private final int partitionLeaderEpoch = 998;

Review comment:
       this variable is used by ```testIterator``` only so we can make it be 
local variable.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -390,150 +449,96 @@ protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
                     }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
 
             // Verify filter result
-            assertEquals(0, filterResult.messagesRead());
-            assertEquals(records.sizeInBytes(), filterResult.bytesRead());
-            assertEquals(baseOffset, filterResult.maxOffset());
-            assertEquals(0, filterResult.messagesRetained());
-            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
-            assertEquals(timestamp, filterResult.maxTimestamp());
-            assertEquals(baseOffset, 
filterResult.shallowOffsetOfMaxTimestamp());
-            assertTrue(filterResult.outputBuffer().position() > 0);
+            assertEquals(0, filterResult.outputBuffer().position());
 
             // Verify filtered records
             filtered.flip();
             MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
-            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filteredRecords.sizeInBytes());
-        }
-    }
-
-    @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
-    public void testEmptyBatchDeletion(Args args) {
-        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            for (final BatchRetention deleteRetention : 
Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) {
-                ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
-                long producerId = 23L;
-                short producerEpoch = 5;
-                long baseOffset = 3L;
-                int baseSequence = 10;
-                int partitionLeaderEpoch = 293;
-                long timestamp = System.currentTimeMillis();
-
-                DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
-                        baseSequence, baseOffset, baseOffset, 
partitionLeaderEpoch, TimestampType.CREATE_TIME,
-                        timestamp, false, false);
-                buffer.flip();
-
-                ByteBuffer filtered = ByteBuffer.allocate(2048);
-                MemoryRecords records = MemoryRecords.readableRecords(buffer);
-                MemoryRecords.FilterResult filterResult = records.filterTo(new 
TopicPartition("foo", 0),
-                        new MemoryRecords.RecordFilter() {
-                            @Override
-                            protected BatchRetention 
checkBatchRetention(RecordBatch batch) {
-                                return deleteRetention;
-                            }
-
-                            @Override
-                            protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
-                                return false;
-                            }
-                        }, filtered, Integer.MAX_VALUE, 
BufferSupplier.NO_CACHING);
-
-                // Verify filter result
-                assertEquals(0, filterResult.outputBuffer().position());
-
-                // Verify filtered records
-                filtered.flip();
-                MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
-                assertEquals(0, filteredRecords.sizeInBytes());
-            }
+            assertEquals(0, filteredRecords.sizeInBytes());
         }
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @ArgumentsSource(MagicAtLeastV2.class)
     public void testBuildEndTxnMarker(Args args) {

Review comment:
       ditto

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -261,105 +277,149 @@ public void testChecksum(Args args) {
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @ArgumentsSource(MagicAtLeastV2.class)
     public void testFilterToPreservesPartitionLeaderEpoch(Args args) {
         byte magic = args.magic;
-        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
-            int partitionLeaderEpoch = 67;
+        int partitionLeaderEpoch = 67;
+
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
args.compression, TimestampType.CREATE_TIME,
+                0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+        builder.append(10L, null, "a".getBytes());
+        builder.append(11L, "1".getBytes(), "b".getBytes());
+        builder.append(12L, null, "c".getBytes());
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        builder.build().filterTo(new TopicPartition("foo", 0), new 
RetainNonNullKeysFilter(), filtered,
+                Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+        filtered.flip();
+        MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+
+        List<MutableRecordBatch> batches = 
TestUtils.toList(filteredRecords.batches());
+        assertEquals(1, batches.size());
 
+        MutableRecordBatch firstBatch = batches.get(0);
+        assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch());
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(MagicAtLeastV2.class)
+    public void testFilterToEmptyBatchRetention(Args args) {
+        byte magic = args.magic;
+        for (boolean isTransactional : Arrays.asList(true, false)) {
             ByteBuffer buffer = ByteBuffer.allocate(2048);
+            long producerId = 23L;
+            short producerEpoch = 5;
+            long baseOffset = 3L;
+            int baseSequence = 10;
+            int partitionLeaderEpoch = 293;
+            int numRecords = 2;
+
             MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magic, args.compression, TimestampType.CREATE_TIME,
-                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
-            builder.append(10L, null, "a".getBytes());
-            builder.append(11L, "1".getBytes(), "b".getBytes());
-            builder.append(12L, null, "c".getBytes());
+                    baseOffset, RecordBatch.NO_TIMESTAMP, producerId, 
producerEpoch, baseSequence, isTransactional,
+                    partitionLeaderEpoch);
+            builder.append(11L, "2".getBytes(), "b".getBytes());
+            builder.append(12L, "3".getBytes(), "c".getBytes());
+            builder.close();
+            MemoryRecords records = builder.build();
 
             ByteBuffer filtered = ByteBuffer.allocate(2048);
-            builder.build().filterTo(new TopicPartition("foo", 0), new 
RetainNonNullKeysFilter(), filtered,
-                    Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+            MemoryRecords.FilterResult filterResult = records.filterTo(new 
TopicPartition("foo", 0),
+                    new MemoryRecords.RecordFilter() {
+                        @Override
+                        protected BatchRetention 
checkBatchRetention(RecordBatch batch) {
+                            // retain all batches
+                            return BatchRetention.RETAIN_EMPTY;
+                        }
 
+                        @Override
+                        protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
+                            // delete the records
+                            return false;
+                        }
+                    }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+            // Verify filter result
+            assertEquals(numRecords, filterResult.messagesRead());
+            assertEquals(records.sizeInBytes(), filterResult.bytesRead());
+            assertEquals(baseOffset + 1, filterResult.maxOffset());
+            assertEquals(0, filterResult.messagesRetained());
+            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
+            assertEquals(12, filterResult.maxTimestamp());
+            assertEquals(baseOffset + 1, 
filterResult.shallowOffsetOfMaxTimestamp());
+
+            // Verify filtered records
             filtered.flip();
             MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
 
             List<MutableRecordBatch> batches = 
TestUtils.toList(filteredRecords.batches());
             assertEquals(1, batches.size());
 
-            MutableRecordBatch firstBatch = batches.get(0);
-            assertEquals(partitionLeaderEpoch, 
firstBatch.partitionLeaderEpoch());
+            MutableRecordBatch batch = batches.get(0);
+            assertEquals(0, batch.countOrNull().intValue());
+            assertEquals(12L, batch.maxTimestamp());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals(baseOffset, batch.baseOffset());
+            assertEquals(baseOffset + 1, batch.lastOffset());
+            assertEquals(baseSequence, batch.baseSequence());
+            assertEquals(baseSequence + 1, batch.lastSequence());
+            assertEquals(isTransactional, batch.isTransactional());
         }
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
-    public void testFilterToEmptyBatchRetention(Args args) {
-        byte magic = args.magic;
-        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
-            for (boolean isTransactional : Arrays.asList(true, false)) {
-                ByteBuffer buffer = ByteBuffer.allocate(2048);
-                long producerId = 23L;
-                short producerEpoch = 5;
-                long baseOffset = 3L;
-                int baseSequence = 10;
-                int partitionLeaderEpoch = 293;
-                int numRecords = 2;
-
-                MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magic, args.compression, TimestampType.CREATE_TIME,
-                        baseOffset, RecordBatch.NO_TIMESTAMP, producerId, 
producerEpoch, baseSequence, isTransactional,
-                        partitionLeaderEpoch);
-                builder.append(11L, "2".getBytes(), "b".getBytes());
-                builder.append(12L, "3".getBytes(), "c".getBytes());
-                builder.close();
-                MemoryRecords records = builder.build();
-
-                ByteBuffer filtered = ByteBuffer.allocate(2048);
-                MemoryRecords.FilterResult filterResult = records.filterTo(new 
TopicPartition("foo", 0),
-                        new MemoryRecords.RecordFilter() {
-                            @Override
-                            protected BatchRetention 
checkBatchRetention(RecordBatch batch) {
-                                // retain all batches
-                                return BatchRetention.RETAIN_EMPTY;
-                            }
-
-                            @Override
-                            protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
-                                // delete the records
-                                return false;
-                            }
-                        }, filtered, Integer.MAX_VALUE, 
BufferSupplier.NO_CACHING);
-
-                // Verify filter result
-                assertEquals(numRecords, filterResult.messagesRead());
-                assertEquals(records.sizeInBytes(), filterResult.bytesRead());
-                assertEquals(baseOffset + 1, filterResult.maxOffset());
-                assertEquals(0, filterResult.messagesRetained());
-                assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
-                assertEquals(12, filterResult.maxTimestamp());
-                assertEquals(baseOffset + 1, 
filterResult.shallowOffsetOfMaxTimestamp());
-
-                // Verify filtered records
-                filtered.flip();
-                MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
-
-                List<MutableRecordBatch> batches = 
TestUtils.toList(filteredRecords.batches());
-                assertEquals(1, batches.size());
-
-                MutableRecordBatch batch = batches.get(0);
-                assertEquals(0, batch.countOrNull().intValue());
-                assertEquals(12L, batch.maxTimestamp());
-                assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
-                assertEquals(baseOffset, batch.baseOffset());
-                assertEquals(baseOffset + 1, batch.lastOffset());
-                assertEquals(baseSequence, batch.baseSequence());
-                assertEquals(baseSequence + 1, batch.lastSequence());
-                assertEquals(isTransactional, batch.isTransactional());
-            }
-        }
+    @ArgumentsSource(MagicAtLeastV2.class)
+    public void testEmptyBatchRetention(Args args) {
+        ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+        long producerId = 23L;
+        short producerEpoch = 5;
+        long baseOffset = 3L;
+        int baseSequence = 10;
+        int partitionLeaderEpoch = 293;
+        long timestamp = System.currentTimeMillis();
+
+        DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
+                baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, 
TimestampType.CREATE_TIME,
+                timestamp, false, false);
+        buffer.flip();
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        MemoryRecords.FilterResult filterResult = records.filterTo(new 
TopicPartition("foo", 0),
+                new MemoryRecords.RecordFilter() {
+                    @Override
+                    protected BatchRetention checkBatchRetention(RecordBatch 
batch) {
+                        // retain all batches
+                        return BatchRetention.RETAIN_EMPTY;
+                    }
+
+                    @Override
+                    protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
+                        return false;
+                    }
+                }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+        // Verify filter result
+        assertEquals(0, filterResult.messagesRead());
+        assertEquals(records.sizeInBytes(), filterResult.bytesRead());
+        assertEquals(baseOffset, filterResult.maxOffset());
+        assertEquals(0, filterResult.messagesRetained());
+        assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
+        assertEquals(timestamp, filterResult.maxTimestamp());
+        assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp());
+        assertTrue(filterResult.outputBuffer().position() > 0);
+
+        // Verify filtered records
+        filtered.flip();
+        MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+        assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filteredRecords.sizeInBytes());
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
-    public void testEmptyBatchRetention(Args args) {
-        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+    @ArgumentsSource(MagicAtLeastV2.class)
+    public void testEmptyBatchDeletion(Args args) {

Review comment:
       ditto

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -390,150 +449,96 @@ protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
                     }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
 
             // Verify filter result
-            assertEquals(0, filterResult.messagesRead());
-            assertEquals(records.sizeInBytes(), filterResult.bytesRead());
-            assertEquals(baseOffset, filterResult.maxOffset());
-            assertEquals(0, filterResult.messagesRetained());
-            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
-            assertEquals(timestamp, filterResult.maxTimestamp());
-            assertEquals(baseOffset, 
filterResult.shallowOffsetOfMaxTimestamp());
-            assertTrue(filterResult.outputBuffer().position() > 0);
+            assertEquals(0, filterResult.outputBuffer().position());
 
             // Verify filtered records
             filtered.flip();
             MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
-            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filteredRecords.sizeInBytes());
-        }
-    }
-
-    @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
-    public void testEmptyBatchDeletion(Args args) {
-        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            for (final BatchRetention deleteRetention : 
Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) {
-                ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
-                long producerId = 23L;
-                short producerEpoch = 5;
-                long baseOffset = 3L;
-                int baseSequence = 10;
-                int partitionLeaderEpoch = 293;
-                long timestamp = System.currentTimeMillis();
-
-                DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
-                        baseSequence, baseOffset, baseOffset, 
partitionLeaderEpoch, TimestampType.CREATE_TIME,
-                        timestamp, false, false);
-                buffer.flip();
-
-                ByteBuffer filtered = ByteBuffer.allocate(2048);
-                MemoryRecords records = MemoryRecords.readableRecords(buffer);
-                MemoryRecords.FilterResult filterResult = records.filterTo(new 
TopicPartition("foo", 0),
-                        new MemoryRecords.RecordFilter() {
-                            @Override
-                            protected BatchRetention 
checkBatchRetention(RecordBatch batch) {
-                                return deleteRetention;
-                            }
-
-                            @Override
-                            protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
-                                return false;
-                            }
-                        }, filtered, Integer.MAX_VALUE, 
BufferSupplier.NO_CACHING);
-
-                // Verify filter result
-                assertEquals(0, filterResult.outputBuffer().position());
-
-                // Verify filtered records
-                filtered.flip();
-                MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
-                assertEquals(0, filteredRecords.sizeInBytes());
-            }
+            assertEquals(0, filteredRecords.sizeInBytes());
         }
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @ArgumentsSource(MagicAtLeastV2.class)
     public void testBuildEndTxnMarker(Args args) {
-        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            long producerId = 73;
-            short producerEpoch = 13;
-            long initialOffset = 983L;
-            int coordinatorEpoch = 347;
-            int partitionLeaderEpoch = 29;
-
-            EndTransactionMarker marker = new 
EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
-            MemoryRecords records = 
MemoryRecords.withEndTransactionMarker(initialOffset, 
System.currentTimeMillis(),
-                    partitionLeaderEpoch, producerId, producerEpoch, marker);
-            // verify that buffer allocation was precise
-            assertEquals(records.buffer().remaining(), 
records.buffer().capacity());
-
-            List<MutableRecordBatch> batches = 
TestUtils.toList(records.batches());
-            assertEquals(1, batches.size());
+        long producerId = 73;
+        short producerEpoch = 13;
+        long initialOffset = 983L;
+        int coordinatorEpoch = 347;
+        int partitionLeaderEpoch = 29;
+
+        EndTransactionMarker marker = new 
EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
+        MemoryRecords records = 
MemoryRecords.withEndTransactionMarker(initialOffset, 
System.currentTimeMillis(),
+                partitionLeaderEpoch, producerId, producerEpoch, marker);
+        // verify that buffer allocation was precise
+        assertEquals(records.buffer().remaining(), 
records.buffer().capacity());
+
+        List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+        assertEquals(1, batches.size());
 
-            RecordBatch batch = batches.get(0);
-            assertTrue(batch.isControlBatch());
-            assertEquals(producerId, batch.producerId());
-            assertEquals(producerEpoch, batch.producerEpoch());
-            assertEquals(initialOffset, batch.baseOffset());
-            assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
-            assertTrue(batch.isValid());
-
-            List<Record> createdRecords = TestUtils.toList(batch);
-            assertEquals(1, createdRecords.size());
-
-            Record record = createdRecords.get(0);
-            assertTrue(record.isValid());
-            EndTransactionMarker deserializedMarker = 
EndTransactionMarker.deserialize(record);
-            assertEquals(ControlRecordType.COMMIT, 
deserializedMarker.controlType());
-            assertEquals(coordinatorEpoch, 
deserializedMarker.coordinatorEpoch());
-        }
+        RecordBatch batch = batches.get(0);
+        assertTrue(batch.isControlBatch());
+        assertEquals(producerId, batch.producerId());
+        assertEquals(producerEpoch, batch.producerEpoch());
+        assertEquals(initialOffset, batch.baseOffset());
+        assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+        assertTrue(batch.isValid());
+
+        List<Record> createdRecords = TestUtils.toList(batch);
+        assertEquals(1, createdRecords.size());
+
+        Record record = createdRecords.get(0);
+        assertTrue(record.isValid());
+        EndTransactionMarker deserializedMarker = 
EndTransactionMarker.deserialize(record);
+        assertEquals(ControlRecordType.COMMIT, 
deserializedMarker.controlType());
+        assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @ArgumentsSource(MagicAtLeastV2.class)
     public void testBuildLeaderChangeMessage(Args args) {

Review comment:
       ditto

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -261,105 +277,149 @@ public void testChecksum(Args args) {
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @ArgumentsSource(MagicAtLeastV2.class)
     public void testFilterToPreservesPartitionLeaderEpoch(Args args) {
         byte magic = args.magic;
-        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
-            int partitionLeaderEpoch = 67;
+        int partitionLeaderEpoch = 67;
+
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
args.compression, TimestampType.CREATE_TIME,
+                0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+        builder.append(10L, null, "a".getBytes());
+        builder.append(11L, "1".getBytes(), "b".getBytes());
+        builder.append(12L, null, "c".getBytes());
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        builder.build().filterTo(new TopicPartition("foo", 0), new 
RetainNonNullKeysFilter(), filtered,
+                Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+        filtered.flip();
+        MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+
+        List<MutableRecordBatch> batches = 
TestUtils.toList(filteredRecords.batches());
+        assertEquals(1, batches.size());
 
+        MutableRecordBatch firstBatch = batches.get(0);
+        assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch());
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(MagicAtLeastV2.class)
+    public void testFilterToEmptyBatchRetention(Args args) {
+        byte magic = args.magic;
+        for (boolean isTransactional : Arrays.asList(true, false)) {
             ByteBuffer buffer = ByteBuffer.allocate(2048);
+            long producerId = 23L;
+            short producerEpoch = 5;
+            long baseOffset = 3L;
+            int baseSequence = 10;
+            int partitionLeaderEpoch = 293;
+            int numRecords = 2;
+
             MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magic, args.compression, TimestampType.CREATE_TIME,
-                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
-            builder.append(10L, null, "a".getBytes());
-            builder.append(11L, "1".getBytes(), "b".getBytes());
-            builder.append(12L, null, "c".getBytes());
+                    baseOffset, RecordBatch.NO_TIMESTAMP, producerId, 
producerEpoch, baseSequence, isTransactional,
+                    partitionLeaderEpoch);
+            builder.append(11L, "2".getBytes(), "b".getBytes());
+            builder.append(12L, "3".getBytes(), "c".getBytes());
+            builder.close();
+            MemoryRecords records = builder.build();
 
             ByteBuffer filtered = ByteBuffer.allocate(2048);
-            builder.build().filterTo(new TopicPartition("foo", 0), new 
RetainNonNullKeysFilter(), filtered,
-                    Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+            MemoryRecords.FilterResult filterResult = records.filterTo(new 
TopicPartition("foo", 0),
+                    new MemoryRecords.RecordFilter() {
+                        @Override
+                        protected BatchRetention 
checkBatchRetention(RecordBatch batch) {
+                            // retain all batches
+                            return BatchRetention.RETAIN_EMPTY;
+                        }
 
+                        @Override
+                        protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
+                            // delete the records
+                            return false;
+                        }
+                    }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+            // Verify filter result
+            assertEquals(numRecords, filterResult.messagesRead());
+            assertEquals(records.sizeInBytes(), filterResult.bytesRead());
+            assertEquals(baseOffset + 1, filterResult.maxOffset());
+            assertEquals(0, filterResult.messagesRetained());
+            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
+            assertEquals(12, filterResult.maxTimestamp());
+            assertEquals(baseOffset + 1, 
filterResult.shallowOffsetOfMaxTimestamp());
+
+            // Verify filtered records
             filtered.flip();
             MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
 
             List<MutableRecordBatch> batches = 
TestUtils.toList(filteredRecords.batches());
             assertEquals(1, batches.size());
 
-            MutableRecordBatch firstBatch = batches.get(0);
-            assertEquals(partitionLeaderEpoch, 
firstBatch.partitionLeaderEpoch());
+            MutableRecordBatch batch = batches.get(0);
+            assertEquals(0, batch.countOrNull().intValue());
+            assertEquals(12L, batch.maxTimestamp());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals(baseOffset, batch.baseOffset());
+            assertEquals(baseOffset + 1, batch.lastOffset());
+            assertEquals(baseSequence, batch.baseSequence());
+            assertEquals(baseSequence + 1, batch.lastSequence());
+            assertEquals(isTransactional, batch.isTransactional());
         }
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
-    public void testFilterToEmptyBatchRetention(Args args) {
-        byte magic = args.magic;
-        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
-            for (boolean isTransactional : Arrays.asList(true, false)) {
-                ByteBuffer buffer = ByteBuffer.allocate(2048);
-                long producerId = 23L;
-                short producerEpoch = 5;
-                long baseOffset = 3L;
-                int baseSequence = 10;
-                int partitionLeaderEpoch = 293;
-                int numRecords = 2;
-
-                MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magic, args.compression, TimestampType.CREATE_TIME,
-                        baseOffset, RecordBatch.NO_TIMESTAMP, producerId, 
producerEpoch, baseSequence, isTransactional,
-                        partitionLeaderEpoch);
-                builder.append(11L, "2".getBytes(), "b".getBytes());
-                builder.append(12L, "3".getBytes(), "c".getBytes());
-                builder.close();
-                MemoryRecords records = builder.build();
-
-                ByteBuffer filtered = ByteBuffer.allocate(2048);
-                MemoryRecords.FilterResult filterResult = records.filterTo(new 
TopicPartition("foo", 0),
-                        new MemoryRecords.RecordFilter() {
-                            @Override
-                            protected BatchRetention 
checkBatchRetention(RecordBatch batch) {
-                                // retain all batches
-                                return BatchRetention.RETAIN_EMPTY;
-                            }
-
-                            @Override
-                            protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
-                                // delete the records
-                                return false;
-                            }
-                        }, filtered, Integer.MAX_VALUE, 
BufferSupplier.NO_CACHING);
-
-                // Verify filter result
-                assertEquals(numRecords, filterResult.messagesRead());
-                assertEquals(records.sizeInBytes(), filterResult.bytesRead());
-                assertEquals(baseOffset + 1, filterResult.maxOffset());
-                assertEquals(0, filterResult.messagesRetained());
-                assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
-                assertEquals(12, filterResult.maxTimestamp());
-                assertEquals(baseOffset + 1, 
filterResult.shallowOffsetOfMaxTimestamp());
-
-                // Verify filtered records
-                filtered.flip();
-                MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
-
-                List<MutableRecordBatch> batches = 
TestUtils.toList(filteredRecords.batches());
-                assertEquals(1, batches.size());
-
-                MutableRecordBatch batch = batches.get(0);
-                assertEquals(0, batch.countOrNull().intValue());
-                assertEquals(12L, batch.maxTimestamp());
-                assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
-                assertEquals(baseOffset, batch.baseOffset());
-                assertEquals(baseOffset + 1, batch.lastOffset());
-                assertEquals(baseSequence, batch.baseSequence());
-                assertEquals(baseSequence + 1, batch.lastSequence());
-                assertEquals(isTransactional, batch.isTransactional());
-            }
-        }
+    @ArgumentsSource(MagicAtLeastV2.class)
+    public void testEmptyBatchRetention(Args args) {

Review comment:
       This is not related to your PR but this test case seems not to require 
parameters.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -390,150 +449,96 @@ protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
                     }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
 
             // Verify filter result
-            assertEquals(0, filterResult.messagesRead());
-            assertEquals(records.sizeInBytes(), filterResult.bytesRead());
-            assertEquals(baseOffset, filterResult.maxOffset());
-            assertEquals(0, filterResult.messagesRetained());
-            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filterResult.bytesRetained());
-            assertEquals(timestamp, filterResult.maxTimestamp());
-            assertEquals(baseOffset, 
filterResult.shallowOffsetOfMaxTimestamp());
-            assertTrue(filterResult.outputBuffer().position() > 0);
+            assertEquals(0, filterResult.outputBuffer().position());
 
             // Verify filtered records
             filtered.flip();
             MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
-            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
filteredRecords.sizeInBytes());
-        }
-    }
-
-    @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
-    public void testEmptyBatchDeletion(Args args) {
-        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            for (final BatchRetention deleteRetention : 
Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) {
-                ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
-                long producerId = 23L;
-                short producerEpoch = 5;
-                long baseOffset = 3L;
-                int baseSequence = 10;
-                int partitionLeaderEpoch = 293;
-                long timestamp = System.currentTimeMillis();
-
-                DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
-                        baseSequence, baseOffset, baseOffset, 
partitionLeaderEpoch, TimestampType.CREATE_TIME,
-                        timestamp, false, false);
-                buffer.flip();
-
-                ByteBuffer filtered = ByteBuffer.allocate(2048);
-                MemoryRecords records = MemoryRecords.readableRecords(buffer);
-                MemoryRecords.FilterResult filterResult = records.filterTo(new 
TopicPartition("foo", 0),
-                        new MemoryRecords.RecordFilter() {
-                            @Override
-                            protected BatchRetention 
checkBatchRetention(RecordBatch batch) {
-                                return deleteRetention;
-                            }
-
-                            @Override
-                            protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
-                                return false;
-                            }
-                        }, filtered, Integer.MAX_VALUE, 
BufferSupplier.NO_CACHING);
-
-                // Verify filter result
-                assertEquals(0, filterResult.outputBuffer().position());
-
-                // Verify filtered records
-                filtered.flip();
-                MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
-                assertEquals(0, filteredRecords.sizeInBytes());
-            }
+            assertEquals(0, filteredRecords.sizeInBytes());
         }
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @ArgumentsSource(MagicAtLeastV2.class)
     public void testBuildEndTxnMarker(Args args) {
-        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            long producerId = 73;
-            short producerEpoch = 13;
-            long initialOffset = 983L;
-            int coordinatorEpoch = 347;
-            int partitionLeaderEpoch = 29;
-
-            EndTransactionMarker marker = new 
EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
-            MemoryRecords records = 
MemoryRecords.withEndTransactionMarker(initialOffset, 
System.currentTimeMillis(),
-                    partitionLeaderEpoch, producerId, producerEpoch, marker);
-            // verify that buffer allocation was precise
-            assertEquals(records.buffer().remaining(), 
records.buffer().capacity());
-
-            List<MutableRecordBatch> batches = 
TestUtils.toList(records.batches());
-            assertEquals(1, batches.size());
+        long producerId = 73;
+        short producerEpoch = 13;
+        long initialOffset = 983L;
+        int coordinatorEpoch = 347;
+        int partitionLeaderEpoch = 29;
+
+        EndTransactionMarker marker = new 
EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
+        MemoryRecords records = 
MemoryRecords.withEndTransactionMarker(initialOffset, 
System.currentTimeMillis(),
+                partitionLeaderEpoch, producerId, producerEpoch, marker);
+        // verify that buffer allocation was precise
+        assertEquals(records.buffer().remaining(), 
records.buffer().capacity());
+
+        List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+        assertEquals(1, batches.size());
 
-            RecordBatch batch = batches.get(0);
-            assertTrue(batch.isControlBatch());
-            assertEquals(producerId, batch.producerId());
-            assertEquals(producerEpoch, batch.producerEpoch());
-            assertEquals(initialOffset, batch.baseOffset());
-            assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
-            assertTrue(batch.isValid());
-
-            List<Record> createdRecords = TestUtils.toList(batch);
-            assertEquals(1, createdRecords.size());
-
-            Record record = createdRecords.get(0);
-            assertTrue(record.isValid());
-            EndTransactionMarker deserializedMarker = 
EndTransactionMarker.deserialize(record);
-            assertEquals(ControlRecordType.COMMIT, 
deserializedMarker.controlType());
-            assertEquals(coordinatorEpoch, 
deserializedMarker.coordinatorEpoch());
-        }
+        RecordBatch batch = batches.get(0);
+        assertTrue(batch.isControlBatch());
+        assertEquals(producerId, batch.producerId());
+        assertEquals(producerEpoch, batch.producerEpoch());
+        assertEquals(initialOffset, batch.baseOffset());
+        assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+        assertTrue(batch.isValid());
+
+        List<Record> createdRecords = TestUtils.toList(batch);
+        assertEquals(1, createdRecords.size());
+
+        Record record = createdRecords.get(0);
+        assertTrue(record.isValid());
+        EndTransactionMarker deserializedMarker = 
EndTransactionMarker.deserialize(record);
+        assertEquals(ControlRecordType.COMMIT, 
deserializedMarker.controlType());
+        assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @ArgumentsSource(MagicAtLeastV2.class)
     public void testBuildLeaderChangeMessage(Args args) {
-        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
-
-            final int leaderId = 5;
-            final int leaderEpoch = 20;
-            final int voterId = 6;
-
-            LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
-                .setLeaderId(leaderId)
-                .setVoters(Collections.singletonList(
-                    new Voter().setVoterId(voterId)));
-            MemoryRecords records = 
MemoryRecords.withLeaderChangeMessage(System.currentTimeMillis(),
-                leaderEpoch, leaderChangeMessage);
+        final int leaderId = 5;
+        final int leaderEpoch = 20;
+        final int voterId = 6;
+
+        LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
+            .setLeaderId(leaderId)
+            .setVoters(Collections.singletonList(
+                new Voter().setVoterId(voterId)));
+        MemoryRecords records = 
MemoryRecords.withLeaderChangeMessage(System.currentTimeMillis(),
+            leaderEpoch, leaderChangeMessage);
+
+        List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+        assertEquals(1, batches.size());
 
-            List<MutableRecordBatch> batches = 
TestUtils.toList(records.batches());
-            assertEquals(1, batches.size());
+        RecordBatch batch = batches.get(0);
+        assertTrue(batch.isControlBatch());
+        assertEquals(0, batch.baseOffset());
+        assertEquals(leaderEpoch, batch.partitionLeaderEpoch());
+        assertTrue(batch.isValid());
 
-            RecordBatch batch = batches.get(0);
-            assertTrue(batch.isControlBatch());
-            assertEquals(0, batch.baseOffset());
-            assertEquals(leaderEpoch, batch.partitionLeaderEpoch());
-            assertTrue(batch.isValid());
+        List<Record> createdRecords = TestUtils.toList(batch);
+        assertEquals(1, createdRecords.size());
 
-            List<Record> createdRecords = TestUtils.toList(batch);
-            assertEquals(1, createdRecords.size());
+        Record record = createdRecords.get(0);
+        assertTrue(record.isValid());
+        assertEquals(ControlRecordType.LEADER_CHANGE, 
ControlRecordType.parse(record.key()));
 
-            Record record = createdRecords.get(0);
-            assertTrue(record.isValid());
-            assertEquals(ControlRecordType.LEADER_CHANGE, 
ControlRecordType.parse(record.key()));
+        LeaderChangeMessage deserializedMessage = 
ControlRecordUtils.deserializeLeaderChangeMessage(record);
+        assertEquals(leaderId, deserializedMessage.leaderId());
+        assertEquals(1, deserializedMessage.voters().size());
+        assertEquals(voterId, deserializedMessage.voters().get(0).voterId());
+    }
 
-            LeaderChangeMessage deserializedMessage = 
ControlRecordUtils.deserializeLeaderChangeMessage(record);
-            assertEquals(leaderId, deserializedMessage.leaderId());
-            assertEquals(1, deserializedMessage.voters().size());
-            assertEquals(voterId, 
deserializedMessage.voters().get(0).voterId());
-        }
+    private static Stream<Arguments> testFilterToBatchDiscard() {
+        return getMemoryRecordsArguments((magic, type) -> 
!asList(CompressionType.NONE, CompressionType.ZSTD).contains(type) || magic >= 
MAGIC_VALUE_V2);
     }
 
     @ParameterizedTest
-    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    @MethodSource

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to