hachikuji commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r681961452
########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ########## @@ -80,6 +80,7 @@ public void write(int b) { private int numRecords = 0; private float actualCompressionRatio = 1; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; + private long deleteHorizonMs; Review comment: Can we rename `firstTimestamp` to `baseTimestamp` here as well? ########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated Review comment: Can we document the return type? ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() { assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } + /** + * This test is used to see if the base timestamp of the batch has been successfully + * converted to a delete horizon for the tombstones / transaction markers of the batch. + * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon. + */ + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testBaseTimestampToDeleteHorizonConversion(Args args) { + int partitionLeaderEpoch = 998; + if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, + 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); + builder.append(10L, "1".getBytes(), null); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + final long deleteHorizon = Integer.MAX_VALUE / 2; + final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) { Review comment: I think this test could be a little simpler. Rather than going through `filterTo`, we can just use `MemoryRecordsBuilder` directly setting the delete horizon. Maybe it is useful to have both. Perhaps we could add a more direct test in `MemoryRecordsBuilderTest` or something like that? ########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated */ private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, dest: LogSegment, map: OffsetMap, - retainDeletesAndTxnMarkers: Boolean, + retainLegacyDeletesAndTxnMarkers: Boolean, + deleteRetentionMs: Long, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], - stats: CleanerStats): Unit = { - val logCleanerFilter: RecordFilter = new RecordFilter { + stats: CleanerStats, + currentTime: Long): Long = { + var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP + + val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) { var discardBatchRecords: Boolean = _ - override def checkBatchRetention(batch: RecordBatch): BatchRetention = { + override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. - discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) + val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata) + + if (batch.isControlBatch) { + if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) { Review comment: This check doesn't make sense since control records only exist for v2. ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR // allow for the possibility that a previous version corrupted the log by writing a compressed record batch // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we // recopy the messages to the destination buffer. - byte batchMagic = batch.magic(); - boolean writeOriginalBatch = true; List<Record> retainedRecords = new ArrayList<>(); - try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) { - while (iterator.hasNext()) { - Record record = iterator.next(); - filterResult.messagesRead += 1; - - if (filter.shouldRetainRecord(batch, record)) { - // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite - // the corrupted batch with correct data. - if (!record.hasMagic(batchMagic)) - writeOriginalBatch = false; - - if (record.offset() > maxOffset) - maxOffset = record.offset(); - - retainedRecords.add(record); - } else { - writeOriginalBatch = false; - } - } - } + final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter, + batchMagic, true, retainedRecords); Review comment: nit: fix alignment ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() { assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } + /** + * This test is used to see if the base timestamp of the batch has been successfully + * converted to a delete horizon for the tombstones / transaction markers of the batch. + * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon. + */ + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testBaseTimestampToDeleteHorizonConversion(Args args) { + int partitionLeaderEpoch = 998; + if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, + 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); + builder.append(10L, "1".getBytes(), null); Review comment: Maybe we can add a few more records here to make the test more interesting ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() { assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } + /** + * This test is used to see if the base timestamp of the batch has been successfully + * converted to a delete horizon for the tombstones / transaction markers of the batch. + * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon. + */ + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) Review comment: nit: since we are filtering magic < 2 below, maybe we could add another provider ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel, @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { + val keepPartitionMetadataFile: Boolean, + @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) extends Logging with KafkaMetricsGroup { Review comment: Can you help me understand why we need to track this here? ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ########## @@ -125,6 +127,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.baseSequence = baseSequence; this.isTransactional = isTransactional; this.isControlBatch = isControlBatch; + this.deleteHorizonMs = deleteHorizonMs; Review comment: Should we validate that no delete horizon has been set if the magic is less than 2? ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -239,9 +247,68 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR return filterResult; } + private static BatchFilterResult filterBatch(RecordBatch batch, + BufferSupplier decompressionBufferSupplier, + FilterResult filterResult, + RecordFilter filter, + byte batchMagic, + boolean writeOriginalBatch, + List<Record> retainedRecords) { + long maxOffset = -1; + boolean containsTombstones = false; + try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) { + while (iterator.hasNext()) { + Record record = iterator.next(); + filterResult.messagesRead += 1; + + if (filter.shouldRetainRecord(batch, record)) { + // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite + // the corrupted batch with correct data. + if (!record.hasMagic(batchMagic)) + writeOriginalBatch = false; + + if (record.offset() > maxOffset) + maxOffset = record.offset(); + + retainedRecords.add(record); + + if (!record.hasValue()) { + containsTombstones = true; + } + } else { + writeOriginalBatch = false; + } + } + return new BatchFilterResult(writeOriginalBatch, containsTombstones, maxOffset); + } + } + + private static class BatchFilterResult { + private final boolean writeOriginalBatch; + private final boolean containsTombstones; + private final long maxOffset; + public BatchFilterResult(final boolean writeOriginalBatch, + final boolean containsTombstones, + final long maxOffset) { + this.writeOriginalBatch = writeOriginalBatch; + this.containsTombstones = containsTombstones; + this.maxOffset = maxOffset; + } + public boolean shouldWriteOriginalBatch() { Review comment: nit: since this is a private class anyway, maybe we can leave off these accessors ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() { assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } + /** + * This test is used to see if the base timestamp of the batch has been successfully + * converted to a delete horizon for the tombstones / transaction markers of the batch. + * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon. + */ + @ParameterizedTest + @ArgumentsSource(MemoryRecordsArgumentsProvider.class) + public void testBaseTimestampToDeleteHorizonConversion(Args args) { + int partitionLeaderEpoch = 998; + if (args.magic >= RecordBatch.MAGIC_VALUE_V2) { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, + 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); + builder.append(10L, "1".getBytes(), null); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + final long deleteHorizon = Integer.MAX_VALUE / 2; + final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) { + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + return true; + } + + @Override + protected BatchRetentionResult checkBatchRetention(RecordBatch batch) { + return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, true); + } + }; + builder.build().filterTo(new TopicPartition("random", 0), recordFilter, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); + assertEquals(1, batches.size()); + assertEquals(deleteHorizon, batches.get(0).deleteHorizonMs().getAsLong()); Review comment: nit: how about using: ```java assertEquals(OptionalLong.of(deleteHorizon), batches.get(0).deleteHorizonMs()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org