mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682012434



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, 
LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP
+
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, 
deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of 
transaction markers.
         // note that we will never delete a marker until all the records from 
that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, 
retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+          if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) {

Review comment:
       ah right, I didn't catch this. Seems like we don't need this block then, 
and we can just move into this check if it's a Control Batch then
   ```
   discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent 
&& batch.deleteHorizonMs().getAsLong <= currentTime
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to