chia7712 commented on a change in pull request #9906: URL: https://github.com/apache/kafka/pull/9906#discussion_r558430765
########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -201,38 +218,37 @@ public void testHasRoomForMethod(Args args) { } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @ArgumentsSource(MagicAtLeastV2.class) public void testHasRoomForMethodWithHeaders(Args args) { byte magic = args.magic; - if (magic >= RecordBatch.MAGIC_VALUE_V2) { - MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(100), magic, args.compression, - TimestampType.CREATE_TIME, 0L); - RecordHeaders headers = new RecordHeaders(); - headers.add("hello", "world.world".getBytes()); - headers.add("hello", "world.world".getBytes()); - headers.add("hello", "world.world".getBytes()); - headers.add("hello", "world.world".getBytes()); - headers.add("hello", "world.world".getBytes()); - builder.append(logAppendTime, "key".getBytes(), "value".getBytes()); - // Make sure that hasRoomFor accounts for header sizes by letting a record without headers pass, but stopping - // a record with a large number of headers. - assertTrue(builder.hasRoomFor(logAppendTime, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS)); - assertFalse(builder.hasRoomFor(logAppendTime, "key".getBytes(), "value".getBytes(), headers.toArray())); - } + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(100), magic, args.compression, + TimestampType.CREATE_TIME, 0L); + RecordHeaders headers = new RecordHeaders(); + headers.add("hello", "world.world".getBytes()); + headers.add("hello", "world.world".getBytes()); + headers.add("hello", "world.world".getBytes()); + headers.add("hello", "world.world".getBytes()); + headers.add("hello", "world.world".getBytes()); + builder.append(logAppendTime, "key".getBytes(), "value".getBytes()); + // Make sure that hasRoomFor accounts for header sizes by letting a record without headers pass, but stopping + // a record with a large number of headers. + assertTrue(builder.hasRoomFor(logAppendTime, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS)); + assertFalse(builder.hasRoomFor(logAppendTime, "key".getBytes(), "value".getBytes(), headers.toArray())); + } + + private static Stream<Arguments> testChecksum() { + return getMemoryRecordsArguments((magic, type) -> type == CompressionType.NONE || type == CompressionType.LZ4); } /** * This test verifies that the checksum returned for various versions matches hardcoded values to catch unintentional * changes to how the checksum is computed. */ @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @MethodSource Review comment: I prefer to add explicit parameter name. Otherwise, the method offering parameters looks like a unused method. ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -81,26 +82,43 @@ public String toString() { } } + private static Stream<Arguments> getMemoryRecordsArguments(BiPredicate<Byte, CompressionType> accept) { Review comment: maybe just me. ```getMemoryRecordsArguments``` is too verbose. How about ```allArguments```? ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -81,26 +82,43 @@ public String toString() { } } + private static Stream<Arguments> getMemoryRecordsArguments(BiPredicate<Byte, CompressionType> accept) { + List<Arguments> arguments = new ArrayList<>(); + for (long firstOffset : asList(0L, 57L)) + for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) + for (CompressionType type : CompressionType.values()) + if (accept.test(magic, type)) + arguments.add(Arguments.of(new Args(magic, firstOffset, type))); + return arguments.stream(); + } + private static class MemoryRecordsArgumentsProvider implements ArgumentsProvider { @Override public Stream<? extends Arguments> provideArguments(ExtensionContext context) { - List<Arguments> arguments = new ArrayList<>(); - for (long firstOffset : asList(0L, 57L)) - for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) - for (CompressionType type: CompressionType.values()) - arguments.add(Arguments.of(new Args(magic, firstOffset, type))); - return arguments.stream(); + return getMemoryRecordsArguments((magic, type) -> true); + } + } + + private static class MagicAtLeastV2 implements ArgumentsProvider { + @Override + public Stream<? extends Arguments> provideArguments(ExtensionContext context) { + return getMemoryRecordsArguments((magic, type) -> magic >= RecordBatch.MAGIC_VALUE_V2); + } + } + + private static class AtLeastV2OrNotZstd implements ArgumentsProvider { + @Override + public Stream<? extends Arguments> provideArguments(ExtensionContext context) { + return getMemoryRecordsArguments((magic, type) -> type != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2); } } private final long logAppendTime = System.currentTimeMillis(); private final int partitionLeaderEpoch = 998; Review comment: this variable is used by ```testIterator``` only so we can make it be local variable. ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -390,150 +449,96 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); // Verify filter result - assertEquals(0, filterResult.messagesRead()); - assertEquals(records.sizeInBytes(), filterResult.bytesRead()); - assertEquals(baseOffset, filterResult.maxOffset()); - assertEquals(0, filterResult.messagesRetained()); - assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); - assertEquals(timestamp, filterResult.maxTimestamp()); - assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp()); - assertTrue(filterResult.outputBuffer().position() > 0); + assertEquals(0, filterResult.outputBuffer().position()); // Verify filtered records filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filteredRecords.sizeInBytes()); - } - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) - public void testEmptyBatchDeletion(Args args) { - if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { - for (final BatchRetention deleteRetention : Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) { - ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); - long producerId = 23L; - short producerEpoch = 5; - long baseOffset = 3L; - int baseSequence = 10; - int partitionLeaderEpoch = 293; - long timestamp = System.currentTimeMillis(); - - DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch, - baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME, - timestamp, false, false); - buffer.flip(); - - ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords records = MemoryRecords.readableRecords(buffer); - MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), - new MemoryRecords.RecordFilter() { - @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - return deleteRetention; - } - - @Override - protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { - return false; - } - }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); - - // Verify filter result - assertEquals(0, filterResult.outputBuffer().position()); - - // Verify filtered records - filtered.flip(); - MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - assertEquals(0, filteredRecords.sizeInBytes()); - } + assertEquals(0, filteredRecords.sizeInBytes()); } } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @ArgumentsSource(MagicAtLeastV2.class) public void testBuildEndTxnMarker(Args args) { Review comment: ditto ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -261,105 +277,149 @@ public void testChecksum(Args args) { } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @ArgumentsSource(MagicAtLeastV2.class) public void testFilterToPreservesPartitionLeaderEpoch(Args args) { byte magic = args.magic; - if (magic >= RecordBatch.MAGIC_VALUE_V2) { - int partitionLeaderEpoch = 67; + int partitionLeaderEpoch = 67; + + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, args.compression, TimestampType.CREATE_TIME, + 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); + builder.append(10L, null, "a".getBytes()); + builder.append(11L, "1".getBytes(), "b".getBytes()); + builder.append(12L, null, "c".getBytes()); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, + Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); + assertEquals(1, batches.size()); + MutableRecordBatch firstBatch = batches.get(0); + assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch()); + } + + @ParameterizedTest + @ArgumentsSource(MagicAtLeastV2.class) + public void testFilterToEmptyBatchRetention(Args args) { + byte magic = args.magic; + for (boolean isTransactional : Arrays.asList(true, false)) { ByteBuffer buffer = ByteBuffer.allocate(2048); + long producerId = 23L; + short producerEpoch = 5; + long baseOffset = 3L; + int baseSequence = 10; + int partitionLeaderEpoch = 293; + int numRecords = 2; + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, args.compression, TimestampType.CREATE_TIME, - 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); - builder.append(10L, null, "a".getBytes()); - builder.append(11L, "1".getBytes(), "b".getBytes()); - builder.append(12L, null, "c".getBytes()); + baseOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional, + partitionLeaderEpoch); + builder.append(11L, "2".getBytes(), "b".getBytes()); + builder.append(12L, "3".getBytes(), "c".getBytes()); + builder.close(); + MemoryRecords records = builder.build(); ByteBuffer filtered = ByteBuffer.allocate(2048); - builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, - Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), + new MemoryRecords.RecordFilter() { + @Override + protected BatchRetention checkBatchRetention(RecordBatch batch) { + // retain all batches + return BatchRetention.RETAIN_EMPTY; + } + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + // delete the records + return false; + } + }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + + // Verify filter result + assertEquals(numRecords, filterResult.messagesRead()); + assertEquals(records.sizeInBytes(), filterResult.bytesRead()); + assertEquals(baseOffset + 1, filterResult.maxOffset()); + assertEquals(0, filterResult.messagesRetained()); + assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); + assertEquals(12, filterResult.maxTimestamp()); + assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp()); + + // Verify filtered records filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); assertEquals(1, batches.size()); - MutableRecordBatch firstBatch = batches.get(0); - assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch()); + MutableRecordBatch batch = batches.get(0); + assertEquals(0, batch.countOrNull().intValue()); + assertEquals(12L, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(baseOffset, batch.baseOffset()); + assertEquals(baseOffset + 1, batch.lastOffset()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(baseSequence + 1, batch.lastSequence()); + assertEquals(isTransactional, batch.isTransactional()); } } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) - public void testFilterToEmptyBatchRetention(Args args) { - byte magic = args.magic; - if (magic >= RecordBatch.MAGIC_VALUE_V2) { - for (boolean isTransactional : Arrays.asList(true, false)) { - ByteBuffer buffer = ByteBuffer.allocate(2048); - long producerId = 23L; - short producerEpoch = 5; - long baseOffset = 3L; - int baseSequence = 10; - int partitionLeaderEpoch = 293; - int numRecords = 2; - - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, args.compression, TimestampType.CREATE_TIME, - baseOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional, - partitionLeaderEpoch); - builder.append(11L, "2".getBytes(), "b".getBytes()); - builder.append(12L, "3".getBytes(), "c".getBytes()); - builder.close(); - MemoryRecords records = builder.build(); - - ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), - new MemoryRecords.RecordFilter() { - @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - // retain all batches - return BatchRetention.RETAIN_EMPTY; - } - - @Override - protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { - // delete the records - return false; - } - }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); - - // Verify filter result - assertEquals(numRecords, filterResult.messagesRead()); - assertEquals(records.sizeInBytes(), filterResult.bytesRead()); - assertEquals(baseOffset + 1, filterResult.maxOffset()); - assertEquals(0, filterResult.messagesRetained()); - assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); - assertEquals(12, filterResult.maxTimestamp()); - assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp()); - - // Verify filtered records - filtered.flip(); - MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - - List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); - assertEquals(1, batches.size()); - - MutableRecordBatch batch = batches.get(0); - assertEquals(0, batch.countOrNull().intValue()); - assertEquals(12L, batch.maxTimestamp()); - assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); - assertEquals(baseOffset, batch.baseOffset()); - assertEquals(baseOffset + 1, batch.lastOffset()); - assertEquals(baseSequence, batch.baseSequence()); - assertEquals(baseSequence + 1, batch.lastSequence()); - assertEquals(isTransactional, batch.isTransactional()); - } - } + @ArgumentsSource(MagicAtLeastV2.class) + public void testEmptyBatchRetention(Args args) { + ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); + long producerId = 23L; + short producerEpoch = 5; + long baseOffset = 3L; + int baseSequence = 10; + int partitionLeaderEpoch = 293; + long timestamp = System.currentTimeMillis(); + + DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch, + baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME, + timestamp, false, false); + buffer.flip(); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), + new MemoryRecords.RecordFilter() { + @Override + protected BatchRetention checkBatchRetention(RecordBatch batch) { + // retain all batches + return BatchRetention.RETAIN_EMPTY; + } + + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + return false; + } + }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + + // Verify filter result + assertEquals(0, filterResult.messagesRead()); + assertEquals(records.sizeInBytes(), filterResult.bytesRead()); + assertEquals(baseOffset, filterResult.maxOffset()); + assertEquals(0, filterResult.messagesRetained()); + assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); + assertEquals(timestamp, filterResult.maxTimestamp()); + assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp()); + assertTrue(filterResult.outputBuffer().position() > 0); + + // Verify filtered records + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filteredRecords.sizeInBytes()); } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) - public void testEmptyBatchRetention(Args args) { - if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { + @ArgumentsSource(MagicAtLeastV2.class) + public void testEmptyBatchDeletion(Args args) { Review comment: ditto ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -390,150 +449,96 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); // Verify filter result - assertEquals(0, filterResult.messagesRead()); - assertEquals(records.sizeInBytes(), filterResult.bytesRead()); - assertEquals(baseOffset, filterResult.maxOffset()); - assertEquals(0, filterResult.messagesRetained()); - assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); - assertEquals(timestamp, filterResult.maxTimestamp()); - assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp()); - assertTrue(filterResult.outputBuffer().position() > 0); + assertEquals(0, filterResult.outputBuffer().position()); // Verify filtered records filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filteredRecords.sizeInBytes()); - } - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) - public void testEmptyBatchDeletion(Args args) { - if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { - for (final BatchRetention deleteRetention : Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) { - ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); - long producerId = 23L; - short producerEpoch = 5; - long baseOffset = 3L; - int baseSequence = 10; - int partitionLeaderEpoch = 293; - long timestamp = System.currentTimeMillis(); - - DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch, - baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME, - timestamp, false, false); - buffer.flip(); - - ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords records = MemoryRecords.readableRecords(buffer); - MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), - new MemoryRecords.RecordFilter() { - @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - return deleteRetention; - } - - @Override - protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { - return false; - } - }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); - - // Verify filter result - assertEquals(0, filterResult.outputBuffer().position()); - - // Verify filtered records - filtered.flip(); - MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - assertEquals(0, filteredRecords.sizeInBytes()); - } + assertEquals(0, filteredRecords.sizeInBytes()); } } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @ArgumentsSource(MagicAtLeastV2.class) public void testBuildEndTxnMarker(Args args) { - if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { - long producerId = 73; - short producerEpoch = 13; - long initialOffset = 983L; - int coordinatorEpoch = 347; - int partitionLeaderEpoch = 29; - - EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch); - MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, System.currentTimeMillis(), - partitionLeaderEpoch, producerId, producerEpoch, marker); - // verify that buffer allocation was precise - assertEquals(records.buffer().remaining(), records.buffer().capacity()); - - List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); - assertEquals(1, batches.size()); + long producerId = 73; + short producerEpoch = 13; + long initialOffset = 983L; + int coordinatorEpoch = 347; + int partitionLeaderEpoch = 29; + + EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch); + MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, System.currentTimeMillis(), + partitionLeaderEpoch, producerId, producerEpoch, marker); + // verify that buffer allocation was precise + assertEquals(records.buffer().remaining(), records.buffer().capacity()); + + List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); + assertEquals(1, batches.size()); - RecordBatch batch = batches.get(0); - assertTrue(batch.isControlBatch()); - assertEquals(producerId, batch.producerId()); - assertEquals(producerEpoch, batch.producerEpoch()); - assertEquals(initialOffset, batch.baseOffset()); - assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); - assertTrue(batch.isValid()); - - List<Record> createdRecords = TestUtils.toList(batch); - assertEquals(1, createdRecords.size()); - - Record record = createdRecords.get(0); - assertTrue(record.isValid()); - EndTransactionMarker deserializedMarker = EndTransactionMarker.deserialize(record); - assertEquals(ControlRecordType.COMMIT, deserializedMarker.controlType()); - assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); - } + RecordBatch batch = batches.get(0); + assertTrue(batch.isControlBatch()); + assertEquals(producerId, batch.producerId()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(initialOffset, batch.baseOffset()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + assertTrue(batch.isValid()); + + List<Record> createdRecords = TestUtils.toList(batch); + assertEquals(1, createdRecords.size()); + + Record record = createdRecords.get(0); + assertTrue(record.isValid()); + EndTransactionMarker deserializedMarker = EndTransactionMarker.deserialize(record); + assertEquals(ControlRecordType.COMMIT, deserializedMarker.controlType()); + assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @ArgumentsSource(MagicAtLeastV2.class) public void testBuildLeaderChangeMessage(Args args) { Review comment: ditto ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -261,105 +277,149 @@ public void testChecksum(Args args) { } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @ArgumentsSource(MagicAtLeastV2.class) public void testFilterToPreservesPartitionLeaderEpoch(Args args) { byte magic = args.magic; - if (magic >= RecordBatch.MAGIC_VALUE_V2) { - int partitionLeaderEpoch = 67; + int partitionLeaderEpoch = 67; + + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, args.compression, TimestampType.CREATE_TIME, + 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); + builder.append(10L, null, "a".getBytes()); + builder.append(11L, "1".getBytes(), "b".getBytes()); + builder.append(12L, null, "c".getBytes()); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, + Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); + assertEquals(1, batches.size()); + MutableRecordBatch firstBatch = batches.get(0); + assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch()); + } + + @ParameterizedTest + @ArgumentsSource(MagicAtLeastV2.class) + public void testFilterToEmptyBatchRetention(Args args) { + byte magic = args.magic; + for (boolean isTransactional : Arrays.asList(true, false)) { ByteBuffer buffer = ByteBuffer.allocate(2048); + long producerId = 23L; + short producerEpoch = 5; + long baseOffset = 3L; + int baseSequence = 10; + int partitionLeaderEpoch = 293; + int numRecords = 2; + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, args.compression, TimestampType.CREATE_TIME, - 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); - builder.append(10L, null, "a".getBytes()); - builder.append(11L, "1".getBytes(), "b".getBytes()); - builder.append(12L, null, "c".getBytes()); + baseOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional, + partitionLeaderEpoch); + builder.append(11L, "2".getBytes(), "b".getBytes()); + builder.append(12L, "3".getBytes(), "c".getBytes()); + builder.close(); + MemoryRecords records = builder.build(); ByteBuffer filtered = ByteBuffer.allocate(2048); - builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, - Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), + new MemoryRecords.RecordFilter() { + @Override + protected BatchRetention checkBatchRetention(RecordBatch batch) { + // retain all batches + return BatchRetention.RETAIN_EMPTY; + } + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + // delete the records + return false; + } + }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + + // Verify filter result + assertEquals(numRecords, filterResult.messagesRead()); + assertEquals(records.sizeInBytes(), filterResult.bytesRead()); + assertEquals(baseOffset + 1, filterResult.maxOffset()); + assertEquals(0, filterResult.messagesRetained()); + assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); + assertEquals(12, filterResult.maxTimestamp()); + assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp()); + + // Verify filtered records filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); assertEquals(1, batches.size()); - MutableRecordBatch firstBatch = batches.get(0); - assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch()); + MutableRecordBatch batch = batches.get(0); + assertEquals(0, batch.countOrNull().intValue()); + assertEquals(12L, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(baseOffset, batch.baseOffset()); + assertEquals(baseOffset + 1, batch.lastOffset()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(baseSequence + 1, batch.lastSequence()); + assertEquals(isTransactional, batch.isTransactional()); } } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) - public void testFilterToEmptyBatchRetention(Args args) { - byte magic = args.magic; - if (magic >= RecordBatch.MAGIC_VALUE_V2) { - for (boolean isTransactional : Arrays.asList(true, false)) { - ByteBuffer buffer = ByteBuffer.allocate(2048); - long producerId = 23L; - short producerEpoch = 5; - long baseOffset = 3L; - int baseSequence = 10; - int partitionLeaderEpoch = 293; - int numRecords = 2; - - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, args.compression, TimestampType.CREATE_TIME, - baseOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional, - partitionLeaderEpoch); - builder.append(11L, "2".getBytes(), "b".getBytes()); - builder.append(12L, "3".getBytes(), "c".getBytes()); - builder.close(); - MemoryRecords records = builder.build(); - - ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), - new MemoryRecords.RecordFilter() { - @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - // retain all batches - return BatchRetention.RETAIN_EMPTY; - } - - @Override - protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { - // delete the records - return false; - } - }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); - - // Verify filter result - assertEquals(numRecords, filterResult.messagesRead()); - assertEquals(records.sizeInBytes(), filterResult.bytesRead()); - assertEquals(baseOffset + 1, filterResult.maxOffset()); - assertEquals(0, filterResult.messagesRetained()); - assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); - assertEquals(12, filterResult.maxTimestamp()); - assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp()); - - // Verify filtered records - filtered.flip(); - MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - - List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); - assertEquals(1, batches.size()); - - MutableRecordBatch batch = batches.get(0); - assertEquals(0, batch.countOrNull().intValue()); - assertEquals(12L, batch.maxTimestamp()); - assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); - assertEquals(baseOffset, batch.baseOffset()); - assertEquals(baseOffset + 1, batch.lastOffset()); - assertEquals(baseSequence, batch.baseSequence()); - assertEquals(baseSequence + 1, batch.lastSequence()); - assertEquals(isTransactional, batch.isTransactional()); - } - } + @ArgumentsSource(MagicAtLeastV2.class) + public void testEmptyBatchRetention(Args args) { Review comment: This is not related to your PR but this test case seems not to require parameters. ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -390,150 +449,96 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); // Verify filter result - assertEquals(0, filterResult.messagesRead()); - assertEquals(records.sizeInBytes(), filterResult.bytesRead()); - assertEquals(baseOffset, filterResult.maxOffset()); - assertEquals(0, filterResult.messagesRetained()); - assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); - assertEquals(timestamp, filterResult.maxTimestamp()); - assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp()); - assertTrue(filterResult.outputBuffer().position() > 0); + assertEquals(0, filterResult.outputBuffer().position()); // Verify filtered records filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filteredRecords.sizeInBytes()); - } - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) - public void testEmptyBatchDeletion(Args args) { - if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { - for (final BatchRetention deleteRetention : Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) { - ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); - long producerId = 23L; - short producerEpoch = 5; - long baseOffset = 3L; - int baseSequence = 10; - int partitionLeaderEpoch = 293; - long timestamp = System.currentTimeMillis(); - - DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch, - baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME, - timestamp, false, false); - buffer.flip(); - - ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords records = MemoryRecords.readableRecords(buffer); - MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), - new MemoryRecords.RecordFilter() { - @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - return deleteRetention; - } - - @Override - protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { - return false; - } - }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); - - // Verify filter result - assertEquals(0, filterResult.outputBuffer().position()); - - // Verify filtered records - filtered.flip(); - MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - assertEquals(0, filteredRecords.sizeInBytes()); - } + assertEquals(0, filteredRecords.sizeInBytes()); } } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @ArgumentsSource(MagicAtLeastV2.class) public void testBuildEndTxnMarker(Args args) { - if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { - long producerId = 73; - short producerEpoch = 13; - long initialOffset = 983L; - int coordinatorEpoch = 347; - int partitionLeaderEpoch = 29; - - EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch); - MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, System.currentTimeMillis(), - partitionLeaderEpoch, producerId, producerEpoch, marker); - // verify that buffer allocation was precise - assertEquals(records.buffer().remaining(), records.buffer().capacity()); - - List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); - assertEquals(1, batches.size()); + long producerId = 73; + short producerEpoch = 13; + long initialOffset = 983L; + int coordinatorEpoch = 347; + int partitionLeaderEpoch = 29; + + EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch); + MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, System.currentTimeMillis(), + partitionLeaderEpoch, producerId, producerEpoch, marker); + // verify that buffer allocation was precise + assertEquals(records.buffer().remaining(), records.buffer().capacity()); + + List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); + assertEquals(1, batches.size()); - RecordBatch batch = batches.get(0); - assertTrue(batch.isControlBatch()); - assertEquals(producerId, batch.producerId()); - assertEquals(producerEpoch, batch.producerEpoch()); - assertEquals(initialOffset, batch.baseOffset()); - assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); - assertTrue(batch.isValid()); - - List<Record> createdRecords = TestUtils.toList(batch); - assertEquals(1, createdRecords.size()); - - Record record = createdRecords.get(0); - assertTrue(record.isValid()); - EndTransactionMarker deserializedMarker = EndTransactionMarker.deserialize(record); - assertEquals(ControlRecordType.COMMIT, deserializedMarker.controlType()); - assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); - } + RecordBatch batch = batches.get(0); + assertTrue(batch.isControlBatch()); + assertEquals(producerId, batch.producerId()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(initialOffset, batch.baseOffset()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + assertTrue(batch.isValid()); + + List<Record> createdRecords = TestUtils.toList(batch); + assertEquals(1, createdRecords.size()); + + Record record = createdRecords.get(0); + assertTrue(record.isValid()); + EndTransactionMarker deserializedMarker = EndTransactionMarker.deserialize(record); + assertEquals(ControlRecordType.COMMIT, deserializedMarker.controlType()); + assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @ArgumentsSource(MagicAtLeastV2.class) public void testBuildLeaderChangeMessage(Args args) { - if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { - - final int leaderId = 5; - final int leaderEpoch = 20; - final int voterId = 6; - - LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() - .setLeaderId(leaderId) - .setVoters(Collections.singletonList( - new Voter().setVoterId(voterId))); - MemoryRecords records = MemoryRecords.withLeaderChangeMessage(System.currentTimeMillis(), - leaderEpoch, leaderChangeMessage); + final int leaderId = 5; + final int leaderEpoch = 20; + final int voterId = 6; + + LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() + .setLeaderId(leaderId) + .setVoters(Collections.singletonList( + new Voter().setVoterId(voterId))); + MemoryRecords records = MemoryRecords.withLeaderChangeMessage(System.currentTimeMillis(), + leaderEpoch, leaderChangeMessage); + + List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); + assertEquals(1, batches.size()); - List<MutableRecordBatch> batches = TestUtils.toList(records.batches()); - assertEquals(1, batches.size()); + RecordBatch batch = batches.get(0); + assertTrue(batch.isControlBatch()); + assertEquals(0, batch.baseOffset()); + assertEquals(leaderEpoch, batch.partitionLeaderEpoch()); + assertTrue(batch.isValid()); - RecordBatch batch = batches.get(0); - assertTrue(batch.isControlBatch()); - assertEquals(0, batch.baseOffset()); - assertEquals(leaderEpoch, batch.partitionLeaderEpoch()); - assertTrue(batch.isValid()); + List<Record> createdRecords = TestUtils.toList(batch); + assertEquals(1, createdRecords.size()); - List<Record> createdRecords = TestUtils.toList(batch); - assertEquals(1, createdRecords.size()); + Record record = createdRecords.get(0); + assertTrue(record.isValid()); + assertEquals(ControlRecordType.LEADER_CHANGE, ControlRecordType.parse(record.key())); - Record record = createdRecords.get(0); - assertTrue(record.isValid()); - assertEquals(ControlRecordType.LEADER_CHANGE, ControlRecordType.parse(record.key())); + LeaderChangeMessage deserializedMessage = ControlRecordUtils.deserializeLeaderChangeMessage(record); + assertEquals(leaderId, deserializedMessage.leaderId()); + assertEquals(1, deserializedMessage.voters().size()); + assertEquals(voterId, deserializedMessage.voters().get(0).voterId()); + } - LeaderChangeMessage deserializedMessage = ControlRecordUtils.deserializeLeaderChangeMessage(record); - assertEquals(leaderId, deserializedMessage.leaderId()); - assertEquals(1, deserializedMessage.voters().size()); - assertEquals(voterId, deserializedMessage.voters().get(0).voterId()); - } + private static Stream<Arguments> testFilterToBatchDiscard() { + return getMemoryRecordsArguments((magic, type) -> !asList(CompressionType.NONE, CompressionType.ZSTD).contains(type) || magic >= MAGIC_VALUE_V2); } @ParameterizedTest - @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + @MethodSource Review comment: ditto ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org