jsancio commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662550101
##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1048,9 +1058,14 @@ object KafkaConfig {
.defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT,
Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM,
InitialBrokerRegistrationTimeoutMsDoc)
.defineInternal(BrokerHeartbeatIntervalMsProp, INT,
Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
.defineInternal(BrokerSessionTimeoutMsProp, INT,
Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
- .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH,
MetadataLogDirDoc)
.defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH,
ControllerListenerNamesDoc)
.defineInternal(SaslMechanismControllerProtocolProp, STRING,
SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH,
SaslMechanismControllerProtocolDoc)
+ .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH,
MetadataLogDirDoc)
+ .defineInternal(MetadataLogSegmentBytesProp, INT,
Defaults.LogSegmentBytes, atLeast(Records.LOG_OVERHEAD), HIGH,
MetadataLogSegmentBytesDoc)
+ .defineInternal(MetadataLogSegmentMillisProp, LONG,
Defaults.LogRollHours * 60 * 60 * 1000L, null, HIGH,
MetadataLogSegmentMillisDoc)
+ .defineInternal(MetadataMaxRetentionBytesProp, LONG,
Defaults.LogRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
+ .defineInternal(MetadataMaxRetentionMillisProp, LONG, null, null, HIGH,
MetadataMaxRetentionMillisDoc)
Review comment:
Are we going to keep the properties added in this PR as internal after
3.0. If not, let's just make them public now.
Do we need a default value to the retention milliseconds property?
##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2651,7 +2651,7 @@ object Log extends Logging {
* @tparam T the type of object held within the iterator
* @return Some(iterator.next) if a next element exists, None otherwise.
*/
- private def nextOption[T](iterator: Iterator[T]): Option[T] = {
+ def nextOption[T](iterator: Iterator[T]): Option[T] = {
Review comment:
I don't think you need to make this public. Scala's `Iterator` has
`nextOption`.
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
}
}
- override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch):
Boolean = {
- val (deleted, forgottenSnapshots) = snapshots synchronized {
- latestSnapshotId().asScala match {
- case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
- startOffset < logStartSnapshotId.offset &&
- logStartSnapshotId.offset <= snapshotId.offset &&
- log.maybeIncrementLogStartOffset(logStartSnapshotId.offset,
SnapshotGenerated)) =>
+ /**
+ * Delete a snapshot, advance the log start offset, and clean old log
segments. This will only happen if the
+ * following invariants all hold true:
+ *
+ * <li>This is not the latest snapshot (i.e., another snapshot proceeds this
one)</li>
+ * <li>The offset of the next snapshot is greater than the log start
offset</li>
+ * <li>The log can be advanced to the offset of the next snapshot</li>
+ *
+ * This method is not thread safe and assumes a lock on the snapshots
collection is held
+ */
+ private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId:
OffsetAndEpoch): Boolean = {
Review comment:
Why change and replace the implementation for `deleteBeforeSnapshot`?
For example, why not always delete every snapshot that is less than
`nextSnapshotId` when the `if` statement predicate is true?
For example, `log.deleteOldSegments()` deletes every segment that is less
than the log start offset. Why not also delete every snapshot that is less than
the log start offset which is the same as `nextSnapshotId`?
This deletes both segments and snapshot. The old name of
`deleteBeforeSnapshot` seems more accurate.
##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -817,11 +863,19 @@ object KafkaMetadataLogTest {
}
}
+ val DefaultMetadataLogConfig = new MetadataLogConfig(
Review comment:
This is a `case class` so new not needed or recommended.
##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2153,14 +2154,36 @@ private boolean maybeCompleteShutdown(long
currentTimeMs) {
return false;
}
- private void maybeDeleteBeforeSnapshot() {
- log.latestSnapshotId().ifPresent(snapshotId -> {
- quorum.highWatermark().ifPresent(highWatermark -> {
- if (highWatermark.offset >= snapshotId.offset) {
- log.deleteBeforeSnapshot(snapshotId);
+ /**
+ * A simple timer based log cleaner
+ */
+ private static class RaftMetadataLogCleaner {
+ private final Logger logger;
+ private final Timer timer;
+ private final long delayMs;
+ private final Runnable cleaner;
+
+ RaftMetadataLogCleaner(Logger logger, Time time, long delayMs,
Runnable cleaner) {
+ this.logger = logger;
+ this.timer = time.timer(delayMs);
+ this.delayMs = delayMs;
+ this.cleaner = cleaner;
+ }
+
+ public boolean maybeClean(long currentTimeMs) {
+ timer.update(currentTimeMs);
+ if (timer.isExpired()) {
Review comment:
Do we need this timer because `log.maybeClean` is expensive even when
there are no snapshots to clean?
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
}
}
- override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch):
Boolean = {
- val (deleted, forgottenSnapshots) = snapshots synchronized {
- latestSnapshotId().asScala match {
- case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
- startOffset < logStartSnapshotId.offset &&
- logStartSnapshotId.offset <= snapshotId.offset &&
- log.maybeIncrementLogStartOffset(logStartSnapshotId.offset,
SnapshotGenerated)) =>
+ /**
+ * Delete a snapshot, advance the log start offset, and clean old log
segments. This will only happen if the
+ * following invariants all hold true:
+ *
+ * <li>This is not the latest snapshot (i.e., another snapshot proceeds this
one)</li>
+ * <li>The offset of the next snapshot is greater than the log start
offset</li>
+ * <li>The log can be advanced to the offset of the next snapshot</li>
+ *
+ * This method is not thread safe and assumes a lock on the snapshots
collection is held
+ */
+ private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch,
nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+ nextSnapshotIdOpt.exists { nextSnapshotId =>
+ if (snapshots.contains(snapshotId) &&
+ snapshots.contains(nextSnapshotId) &&
+ startOffset < nextSnapshotId.offset &&
+ snapshotId.offset < nextSnapshotId.offset &&
+ log.maybeIncrementLogStartOffset(nextSnapshotId.offset,
SnapshotGenerated)) {
+ log.deleteOldSegments()
+ val forgotten = mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]]
+ snapshots.remove(snapshotId) match {
+ case Some(removedSnapshot) => forgotten.put(snapshotId,
removedSnapshot)
+ case None => throw new IllegalStateException(s"Could not remove
snapshot $snapshotId from our cache.")
+ }
+ removeSnapshots(forgotten)
+ true
+ } else {
+ false
+ }
+ }
+ }
- // Delete all segments that have a "last offset" less than the log
start offset
- log.deleteOldSegments()
+ /**
+ * Force all known snapshots to have an open reader so we can know their
sizes. This method is not thread-safe
+ */
+ private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+ snapshots.keys.toSeq.flatMap {
+ snapshotId => readSnapshot(snapshotId).asScala.map { reader =>
(snapshotId, reader.sizeInBytes())}
+ }
+ }
- // Forget snapshots less than the log start offset
- (true, forgetSnapshotsBefore(logStartSnapshotId))
- case _ =>
- (false, mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]])
+ /**
+ * Return the max timestamp of the first batch in a snapshot, if the
snapshot exists and has records
+ */
+ private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long]
= {
Review comment:
Sounds good. Can we move this implementation to `SnapshotReader`? It
will match what we are doing here
https://github.com/apache/kafka/pull/10946/files#diff-83295dbf4af9755c79987b390f72f53dda47af9f82eb8305755d90da18d5b9f2R85
##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2089,7 +2090,7 @@ private long pollUnattachedAsObserver(UnattachedState
state, long currentTimeMs)
}
private long pollCurrentState(long currentTimeMs) {
- maybeDeleteBeforeSnapshot();
+ snapshotCleaner.maybeClean(currentTimeMs);
Review comment:
We need to communicate the cleaner's timeout to the poll method so that
knows for how long to wait in the message queue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]