hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r699714060
##########
File path:
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
}
/**
- * Get the timestamp of the first record in this batch. It is always the
create time of the record even if the
- * timestamp type of the batch is log append time.
- *
- * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the
batch is empty
+ * Gets the base timestamp of the batch which is used to calculate the
timestamp deltas.
Review comment:
nit: the base timestamp is used to calculate the record timestamps from
the deltas
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
case None => 0L
case Some(seg) => seg.lastModified -
cleanable.log.config.deleteRetentionMs
}
-
- doClean(cleanable, deleteHorizonMs)
+ doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs =
deleteHorizonMs)
}
- private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long):
(Long, CleanerStats) = {
+ private[log] def doClean(cleanable: LogToClean, currentTime: Long,
legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {
Review comment:
Do we need `legacyDeleteHorizonMs` as a parameter? As far as I can tell,
there are no cases in the tests which override it. Maybe we could just compute
it here instead of in `clean`?
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,38 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
- * @param retainDeletesAndTxnMarkers Should tombstones and markers be
retained while cleaning this segment
+ * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than
version 2) and markers be retained while cleaning this segment
+ * @param deleteRetentionMs Defines how long a tombstone should be kept as
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding
topic
* @param stats Collector for cleaning statistics
+ * @param currentTime The time at which the clean was initiated
+ *
+ * @return the latestDeleteHorizon that is found from the FilterResult of
the cleaning
*/
private[log] def cleanInto(topicPartition: TopicPartition,
sourceRecords: FileRecords,
dest: LogSegment,
map: OffsetMap,
- retainDeletesAndTxnMarkers: Boolean,
+ retainLegacyDeletesAndTxnMarkers: Boolean,
+ deleteRetentionMs: Long,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: Map[Long,
LastRecord],
- stats: CleanerStats): Unit = {
- val logCleanerFilter: RecordFilter = new RecordFilter {
+ stats: CleanerStats,
+ currentTime: Long): Long = {
+ val logCleanerFilter: RecordFilter = new RecordFilter(currentTime,
deleteRetentionMs) {
var discardBatchRecords: Boolean = _
- override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+ override def checkBatchRetention(batch: RecordBatch):
RecordFilter.BatchRetentionResult = {
// we piggy-back on the tombstone retention logic to delay deletion of
transaction markers.
// note that we will never delete a marker until all the records from
that transaction are removed.
- discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata,
retainTxnMarkers = retainDeletesAndTxnMarkers)
+ val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+ if (batch.isControlBatch) {
+ discardBatchRecords = canDiscardBatch &&
batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <=
currentTime
Review comment:
nit: misaligned
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,23 +676,28 @@ private[log] class Cleaner(val id: Int,
}
}
- if (batch.hasProducerId && isBatchLastRecordOfProducer)
- BatchRetention.RETAIN_EMPTY
- else if (discardBatchRecords)
- BatchRetention.DELETE
- else
- BatchRetention.DELETE_EMPTY
+ val batchRetention: BatchRetention =
+ if (batch.hasProducerId && isBatchLastRecordOfProducer)
+ BatchRetention.RETAIN_EMPTY
+ else if (discardBatchRecords)
+ BatchRetention.DELETE
+ else
+ BatchRetention.DELETE_EMPTY
+ new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)
}
override def shouldRetainRecord(batch: RecordBatch, record: Record):
Boolean = {
+ var isRecordRetained: Boolean = true
Review comment:
Why do we need this `var`?
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -776,7 +801,13 @@ private[log] class Cleaner(val id: Int,
* 2) The message doesn't has value but it can't be deleted now.
*/
val latestOffsetForKey = record.offset() >= foundOffset
- val isRetainedValue = record.hasValue || retainDeletes
+ val supportDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2
+ val shouldRetainDeletes =
Review comment:
nit: maybe turn this into a `def` since we don't even use the computed
value unless `record.hasValue` is false.
##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs:
Seq[File],
val cleanableLogs = dirtyLogs.filter { ltc =>
(ltc.needCompactionNow && ltc.cleanableBytes > 0) ||
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
}
+
if(cleanableLogs.isEmpty) {
- None
+ val logsWithTombstonesExpired = dirtyLogs.filter {
+ case ltc =>
+ // in this case, we are probably in a low throughput situation
+ // therefore, we should take advantage of this fact and remove
tombstones if we can
+ // under the condition that the log's latest delete horizon is
less than the current time
+ // tracked
+ ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP &&
ltc.log.latestDeleteHorizon <= time.milliseconds()
Review comment:
When the broker is initialized, `log.latestDeleteHorizon` will be
`NO_TIMESTAMP`. We need at least one run to trigger before we can initialize
the value. Is there another condition we can rely on in order to ensure that
the cleaning still occurs?
##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs:
Seq[File],
val cleanableLogs = dirtyLogs.filter { ltc =>
(ltc.needCompactionNow && ltc.cleanableBytes > 0) ||
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
}
+
if(cleanableLogs.isEmpty) {
- None
+ val logsWithTombstonesExpired = dirtyLogs.filter {
+ case ltc =>
Review comment:
nit: no need for `case`. Usually we write this as
```scala
dirtyLogs.filter { ltc =>
...
}
```
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -522,13 +523,13 @@ private[log] class Cleaner(val id: Int,
val cleanableHorizonMs = log.logSegments(0,
cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
// group the segments and clean the groups
- info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior
to %s)...".format(log.name, new Date(cleanableHorizonMs), new
Date(deleteHorizonMs)))
+ info("Cleaning log %s (cleaning prior to %s, discarding legacy tombstones
prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new
Date(legacyDeleteHorizonMs)))
Review comment:
Might not be very clear what a "legacy tombstone" means. Would it be
fair to call this an upper bound on the deletion horizon?
##########
File path:
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
}
/**
- * Get the timestamp of the first record in this batch. It is always the
create time of the record even if the
- * timestamp type of the batch is log append time.
- *
- * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the
batch is empty
+ * Gets the base timestamp of the batch which is used to calculate the
timestamp deltas.
+ *
+ * @return The base timestamp or
+ * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
Review comment:
I think we can leave off the comment about the batch being empty since
we're not using this for the first timestamp anymore.
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -544,17 +545,19 @@ private[log] class Cleaner(val id: Int,
* @param log The log being cleaned
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
- * @param deleteHorizonMs The time to retain delete tombstones
+ * @param currentTime The current time in milliseconds
* @param stats Collector for cleaning statistics
* @param transactionMetadata State of ongoing transactions which is carried
between the cleaning
* of the grouped segments
+ * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose
version is less than 2
*/
private[log] def cleanSegments(log: UnifiedLog,
segments: Seq[LogSegment],
map: OffsetMap,
- deleteHorizonMs: Long,
+ currentTime: Long,
stats: CleanerStats,
- transactionMetadata:
CleanedTransactionMetadata): Unit = {
+ transactionMetadata:
CleanedTransactionMetadata,
+ legacyDeleteHorizonMs: Long = -1L): Unit = {
Review comment:
Can we make this a required parameter? We try to avoid optional
parameters because it is easy to miss them.
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -574,14 +577,17 @@ private[log] class Cleaner(val id: Int,
val abortedTransactions = log.collectAbortedTransactions(startOffset,
upperBoundOffset)
transactionMetadata.addAbortedTransactions(abortedTransactions)
- val retainDeletesAndTxnMarkers = currentSegment.lastModified >
deleteHorizonMs
+ val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified >
legacyDeleteHorizonMs
info(s"Cleaning $currentSegment in log ${log.name} into
${cleaned.baseOffset} " +
- s"with deletion horizon $deleteHorizonMs, " +
- s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"}
deletes.")
+ s"with legacy deletion horizon $legacyDeleteHorizonMs, " +
+ s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else
"discarding"} deletes.")
Review comment:
This log message becomes confusing after this change. How about
something like this?
```
s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
s"with an upper bound deletion horizon $legacyDeleteHorizonMs computed
from " +
s"the segment last modified time of ${currentSegment.lastModified}"
```
##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs:
Seq[File],
val cleanableLogs = dirtyLogs.filter { ltc =>
(ltc.needCompactionNow && ltc.cleanableBytes > 0) ||
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
}
+
if(cleanableLogs.isEmpty) {
- None
+ val logsWithTombstonesExpired = dirtyLogs.filter {
+ case ltc =>
+ // in this case, we are probably in a low throughput situation
+ // therefore, we should take advantage of this fact and remove
tombstones if we can
+ // under the condition that the log's latest delete horizon is
less than the current time
+ // tracked
+ ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP &&
ltc.log.latestDeleteHorizon <= time.milliseconds()
+ }
+ if (!logsWithTombstonesExpired.isEmpty) {
Review comment:
nit: use `nonEmpty`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]