junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r708747148
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,35 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
- * @param retainDeletesAndTxnMarkers Should tombstones and markers be
retained while cleaning this segment
+ * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than
version 2) and markers be retained while cleaning this segment
+ * @param deleteRetentionMs Defines how long a tombstone should be kept as
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding
topic
* @param stats Collector for cleaning statistics
+ * @param currentTime The time at which the clean was initiated
*/
private[log] def cleanInto(topicPartition: TopicPartition,
sourceRecords: FileRecords,
dest: LogSegment,
map: OffsetMap,
- retainDeletesAndTxnMarkers: Boolean,
+ retainLegacyDeletesAndTxnMarkers: Boolean,
+ deleteRetentionMs: Long,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: Map[Long,
LastRecord],
- stats: CleanerStats): Unit = {
- val logCleanerFilter: RecordFilter = new RecordFilter {
+ stats: CleanerStats,
+ currentTime: Long): Unit = {
+ val logCleanerFilter: RecordFilter = new RecordFilter(currentTime,
deleteRetentionMs) {
var discardBatchRecords: Boolean = _
- override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+ override def checkBatchRetention(batch: RecordBatch):
RecordFilter.BatchRetentionResult = {
// we piggy-back on the tombstone retention logic to delay deletion of
transaction markers.
// note that we will never delete a marker until all the records from
that transaction are removed.
- discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata,
retainTxnMarkers = retainDeletesAndTxnMarkers)
+ val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+ if (batch.isControlBatch)
+ discardBatchRecords = canDiscardBatch &&
batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <=
currentTime
+ else
+ discardBatchRecords = canDiscardBatch
Review comment:
This is an existing issue. The following comment on line 1136 seems out
of place since the code does that check is inside isBatchLastRecordOfProducer()
below.
```
// We may retain a record from an aborted transaction if it is
the last entry
// written by a given producerId.
```
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -493,19 +496,19 @@ private[log] class Cleaner(val id: Int,
* @return The first offset not cleaned and the statistics for this round of
cleaning
*/
private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+ doClean(cleanable, time.milliseconds())
+ }
+
+ private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long,
CleanerStats) = {
+ info("Beginning cleaning of log %s".format(cleanable.log.name))
+
// figure out the timestamp below which it is safe to remove delete
tombstones
// this position is defined to be a configurable time beneath the last
modified time of the last clean segment
- val deleteHorizonMs =
+ val legacyDeleteHorizonMs =
Review comment:
Perhaps mention in the comment above that this is only used for the old
message format?
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -1060,7 +1082,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
/**
* Helper class for a log, its topic/partition, the first cleanable position,
the first uncleanable dirty position,
- * and whether it needs compaction immediately.
+ * the reason why it is being cleaned, and whether it needs compaction
immediately.
Review comment:
We no longer pass in the reason.
##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -163,17 +163,18 @@ private[log] class LogCleanerManager(val logDirs:
Seq[File],
* Choose the log to clean next and add it to the in-progress set. We
recompute this
* each time from the full set of logs to allow logs to be dynamically
added to the pool of logs
* the log manager maintains.
+ * Returns a tuple of an Option of the log selected to be cleaned and the
reason it was selected.
Review comment:
We no longer return the reason.
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,20 +673,22 @@ private[log] class Cleaner(val id: Int,
}
}
- if (batch.hasProducerId && isBatchLastRecordOfProducer)
- BatchRetention.RETAIN_EMPTY
- else if (discardBatchRecords)
- BatchRetention.DELETE
- else
- BatchRetention.DELETE_EMPTY
+ val batchRetention: BatchRetention =
+ if (batch.hasProducerId && isBatchLastRecordOfProducer)
+ BatchRetention.RETAIN_EMPTY
+ else if (discardBatchRecords)
+ BatchRetention.DELETE
+ else
+ BatchRetention.DELETE_EMPTY
+ new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)
Review comment:
It seems that containsMarkerForEmptyTxn should only be set to
canDiscardBatch if this batch is a control batch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]