hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r681961452



##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
##########
@@ -80,6 +80,7 @@ public void write(int b) {
     private int numRecords = 0;
     private float actualCompressionRatio = 1;
     private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
+    private long deleteHorizonMs;

Review comment:
       Can we rename `firstTimestamp` to `baseTimestamp` here as well?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
       Can we document the return type?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been 
successfully
+     * converted to a delete horizon for the tombstones / transaction markers 
of the batch.
+     * It also verifies that the record timestamps remain correct as a delta 
relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+            ByteBuffer buffer = ByteBuffer.allocate(2048);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
args.magic, args.compression, TimestampType.CREATE_TIME,
+                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+            builder.append(10L, "1".getBytes(), null);
+
+            ByteBuffer filtered = ByteBuffer.allocate(2048);
+            final long deleteHorizon = Integer.MAX_VALUE / 2;
+            final RecordFilter recordFilter = new 
MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {

Review comment:
       I think this test could be a little simpler. Rather than going through 
`filterTo`, we can just use `MemoryRecordsBuilder` directly setting the delete 
horizon. Maybe it is useful to have both. Perhaps we could add a more direct  
test in `MemoryRecordsBuilderTest` or something like that?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, 
LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP
+
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, 
deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of 
transaction markers.
         // note that we will never delete a marker until all the records from 
that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, 
retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+          if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) {

Review comment:
       This check doesn't make sense since control records only exist for v2.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -171,38 +177,40 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable<MutableR
             // allow for the possibility that a previous version corrupted the 
log by writing a compressed record batch
             // with a magic value not matching the magic of the records (magic 
< 2). This will be fixed as we
             // recopy the messages to the destination buffer.
-
             byte batchMagic = batch.magic();
-            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
-            try (final CloseableIterator<Record> iterator = 
batch.streamingIterator(decompressionBufferSupplier)) {
-                while (iterator.hasNext()) {
-                    Record record = iterator.next();
-                    filterResult.messagesRead += 1;
-
-                    if (filter.shouldRetainRecord(batch, record)) {
-                        // Check for log corruption due to KAFKA-4298. If we 
find it, make sure that we overwrite
-                        // the corrupted batch with correct data.
-                        if (!record.hasMagic(batchMagic))
-                            writeOriginalBatch = false;
-
-                        if (record.offset() > maxOffset)
-                            maxOffset = record.offset();
-
-                        retainedRecords.add(record);
-                    } else {
-                        writeOriginalBatch = false;
-                    }
-                }
-            }
+            final BatchFilterResult iterationResult = filterBatch(batch, 
decompressionBufferSupplier, filterResult, filter,
+                                                                          
batchMagic, true, retainedRecords);

Review comment:
       nit: fix alignment

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been 
successfully
+     * converted to a delete horizon for the tombstones / transaction markers 
of the batch.
+     * It also verifies that the record timestamps remain correct as a delta 
relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+            ByteBuffer buffer = ByteBuffer.allocate(2048);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
args.magic, args.compression, TimestampType.CREATE_TIME,
+                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+            builder.append(10L, "1".getBytes(), null);

Review comment:
       Maybe we can add a few more records here to make the test more 
interesting

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been 
successfully
+     * converted to a delete horizon for the tombstones / transaction markers 
of the batch.
+     * It also verifies that the record timestamps remain correct as a delta 
relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)

Review comment:
       nit: since we are filtering magic < 2 below, maybe we could add another 
provider

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean,
+          @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) 
extends Logging with KafkaMetricsGroup {

Review comment:
       Can you help me understand why we need to track this here?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
##########
@@ -125,6 +127,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream 
bufferStream,
         this.baseSequence = baseSequence;
         this.isTransactional = isTransactional;
         this.isControlBatch = isControlBatch;
+        this.deleteHorizonMs = deleteHorizonMs;

Review comment:
       Should we validate that no delete horizon has been set if the magic is 
less than 2?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -239,9 +247,68 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable<MutableR
         return filterResult;
     }
 
+    private static BatchFilterResult filterBatch(RecordBatch batch,
+                                                 BufferSupplier 
decompressionBufferSupplier,
+                                                 FilterResult filterResult,
+                                                 RecordFilter filter,
+                                                 byte batchMagic,
+                                                 boolean writeOriginalBatch,
+                                                 List<Record> retainedRecords) 
{
+        long maxOffset = -1;
+        boolean containsTombstones = false;
+        try (final CloseableIterator<Record> iterator = 
batch.streamingIterator(decompressionBufferSupplier)) {
+            while (iterator.hasNext()) {
+                Record record = iterator.next();
+                filterResult.messagesRead += 1;
+
+                if (filter.shouldRetainRecord(batch, record)) {
+                    // Check for log corruption due to KAFKA-4298. If we find 
it, make sure that we overwrite
+                    // the corrupted batch with correct data.
+                    if (!record.hasMagic(batchMagic))
+                        writeOriginalBatch = false;
+
+                    if (record.offset() > maxOffset)
+                        maxOffset = record.offset();
+
+                    retainedRecords.add(record);
+
+                    if (!record.hasValue()) {
+                        containsTombstones = true;
+                    }
+                } else {
+                    writeOriginalBatch = false;
+                }
+            }
+            return new BatchFilterResult(writeOriginalBatch, 
containsTombstones, maxOffset);
+        }
+    }
+
+    private static class BatchFilterResult {
+        private final boolean writeOriginalBatch;
+        private final boolean containsTombstones;
+        private final long maxOffset;
+        public BatchFilterResult(final boolean writeOriginalBatch,
+                                 final boolean containsTombstones,
+                                 final long maxOffset) {
+            this.writeOriginalBatch = writeOriginalBatch;
+            this.containsTombstones = containsTombstones;
+            this.maxOffset = maxOffset;
+        }
+        public boolean shouldWriteOriginalBatch() {

Review comment:
       nit: since this is a private class anyway, maybe we can leave off these 
accessors

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##########
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
         assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
     }
 
+    /**
+     * This test is used to see if the base timestamp of the batch has been 
successfully
+     * converted to a delete horizon for the tombstones / transaction markers 
of the batch.
+     * It also verifies that the record timestamps remain correct as a delta 
relative to the delete horizon.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+        int partitionLeaderEpoch = 998;
+        if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+            ByteBuffer buffer = ByteBuffer.allocate(2048);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
args.magic, args.compression, TimestampType.CREATE_TIME,
+                    0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+            builder.append(10L, "1".getBytes(), null);
+
+            ByteBuffer filtered = ByteBuffer.allocate(2048);
+            final long deleteHorizon = Integer.MAX_VALUE / 2;
+            final RecordFilter recordFilter = new 
MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
+                @Override
+                protected boolean shouldRetainRecord(RecordBatch recordBatch, 
Record record) {
+                    return true;
+                }
+
+                @Override
+                protected BatchRetentionResult checkBatchRetention(RecordBatch 
batch) {
+                    return new 
BatchRetentionResult(BatchRetention.RETAIN_EMPTY, true);
+                }
+            };
+            builder.build().filterTo(new TopicPartition("random", 0), 
recordFilter, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+            filtered.flip();
+            MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+
+            List<MutableRecordBatch> batches = 
TestUtils.toList(filteredRecords.batches());
+            assertEquals(1, batches.size());
+            assertEquals(deleteHorizon, 
batches.get(0).deleteHorizonMs().getAsLong());

Review comment:
       nit: how about using:
   ```java
   assertEquals(OptionalLong.of(deleteHorizon), 
batches.get(0).deleteHorizonMs());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to