mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709503140
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,35 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
- * @param retainDeletesAndTxnMarkers Should tombstones and markers be
retained while cleaning this segment
+ * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than
version 2) and markers be retained while cleaning this segment
+ * @param deleteRetentionMs Defines how long a tombstone should be kept as
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding
topic
* @param stats Collector for cleaning statistics
+ * @param currentTime The time at which the clean was initiated
*/
private[log] def cleanInto(topicPartition: TopicPartition,
sourceRecords: FileRecords,
dest: LogSegment,
map: OffsetMap,
- retainDeletesAndTxnMarkers: Boolean,
+ retainLegacyDeletesAndTxnMarkers: Boolean,
+ deleteRetentionMs: Long,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: Map[Long,
LastRecord],
- stats: CleanerStats): Unit = {
- val logCleanerFilter: RecordFilter = new RecordFilter {
+ stats: CleanerStats,
+ currentTime: Long): Unit = {
+ val logCleanerFilter: RecordFilter = new RecordFilter(currentTime,
deleteRetentionMs) {
var discardBatchRecords: Boolean = _
- override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+ override def checkBatchRetention(batch: RecordBatch):
RecordFilter.BatchRetentionResult = {
// we piggy-back on the tombstone retention logic to delay deletion of
transaction markers.
// note that we will never delete a marker until all the records from
that transaction are removed.
- discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata,
retainTxnMarkers = retainDeletesAndTxnMarkers)
+ val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+ if (batch.isControlBatch)
+ discardBatchRecords = canDiscardBatch &&
batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <=
currentTime
+ else
+ discardBatchRecords = canDiscardBatch
Review comment:
makes sense. I've removed that comment on 1136 since the case is
mentioned in `isBatchLastRecordOfProducer`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]