junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r708747148



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,35 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, 
LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Unit = {
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, 
deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of 
transaction markers.
         // note that we will never delete a marker until all the records from 
that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, 
retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch)
+          discardBatchRecords = canDiscardBatch && 
batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= 
currentTime
+        else
+          discardBatchRecords = canDiscardBatch

Review comment:
       This is an existing issue. The following comment on line 1136 seems out 
of place since the code does that check is inside isBatchLastRecordOfProducer() 
below.
   
   ```
               // We may retain a record from an aborted transaction if it is 
the last entry
               // written by a given producerId.
   
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -493,19 +496,19 @@ private[log] class Cleaner(val id: Int,
    * @return The first offset not cleaned and the statistics for this round of 
cleaning
    */
   private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+    doClean(cleanable, time.milliseconds())
+  }
+
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, 
CleanerStats) = {
+    info("Beginning cleaning of log %s".format(cleanable.log.name))
+
     // figure out the timestamp below which it is safe to remove delete 
tombstones
     // this position is defined to be a configurable time beneath the last 
modified time of the last clean segment
-    val deleteHorizonMs =
+    val legacyDeleteHorizonMs =

Review comment:
       Perhaps mention in the comment above that this is only used for the old 
message format?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -1060,7 +1082,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
 
 /**
   * Helper class for a log, its topic/partition, the first cleanable position, 
the first uncleanable dirty position,
-  * and whether it needs compaction immediately.
+  * the reason why it is being cleaned, and whether it needs compaction 
immediately.

Review comment:
       We no longer pass in the reason.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -163,17 +163,18 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
     * the log manager maintains.
+    * Returns a tuple of an Option of the log selected to be cleaned and the 
reason it was selected.

Review comment:
       We no longer return the reason.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,20 +673,22 @@ private[log] class Cleaner(val id: Int,
           }
         }
 
-        if (batch.hasProducerId && isBatchLastRecordOfProducer)
-          BatchRetention.RETAIN_EMPTY
-        else if (discardBatchRecords)
-          BatchRetention.DELETE
-        else
-          BatchRetention.DELETE_EMPTY
+        val batchRetention: BatchRetention =
+          if (batch.hasProducerId && isBatchLastRecordOfProducer)
+            BatchRetention.RETAIN_EMPTY
+          else if (discardBatchRecords)
+            BatchRetention.DELETE
+          else
+            BatchRetention.DELETE_EMPTY
+        new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)

Review comment:
       It seems that containsMarkerForEmptyTxn should only be set to 
canDiscardBatch if this batch is a control batch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to