dengziming commented on code in PR #19762: URL: https://github.com/apache/kafka/pull/19762#discussion_r2113867108
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java: ########## @@ -0,0 +1,843 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.snapshot.FileRawSnapshotReader; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.SnapshotPath; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class KafkaRaftLog implements ReplicatedLog { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRaftLog.class); + + private final Logger logger; + private final UnifiedLog log; + private final Time time; + private final Scheduler scheduler; + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + private final TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots; + private final TopicPartition topicPartition; + private final MetadataLogConfig config; + private final String logIdent; + + public static KafkaRaftLog createLog( + TopicPartition topicPartition, + Uuid topicId, + File dataDir, + Time time, + Scheduler scheduler, + MetadataLogConfig config, + int nodeId) throws IOException { + Properties props = new Properties(); + props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(config.maxBatchSizeInBytes())); + props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.logSegmentBytes())); + props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(config.logSegmentMillis())); + props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT)); + + // Disable time and byte retention when deleting segments + props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1"); + props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1"); + LogConfig.validate(props); + LogConfig defaultLogConfig = new LogConfig(props); + + if (config.logSegmentBytes() < config.logSegmentMinBytes()) { + throw new InvalidConfigurationException( + "Cannot set " + MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG + " below " + config.logSegmentMinBytes() + ": " + config.logSegmentBytes() + ); + } else if (defaultLogConfig.retentionMs >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_MS_CONFIG + " above -1: " + defaultLogConfig.retentionMs + ); + } else if (defaultLogConfig.retentionSize >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_BYTES_CONFIG + " above -1: " + defaultLogConfig.retentionSize + ); + } + + UnifiedLog log = UnifiedLog.create( + dataDir, + defaultLogConfig, + 0L, + 0L, + scheduler, + new BrokerTopicStats(), + time, + Integer.MAX_VALUE, + new ProducerStateManagerConfig(Integer.MAX_VALUE, false), + Integer.MAX_VALUE, + new LogDirFailureChannel(5), + false, + Optional.of(topicId) + ); + + KafkaRaftLog metadataLog = new KafkaRaftLog( + log, + time, + scheduler, + recoverSnapshots(log), + topicPartition, + config, + nodeId + ); + + // Print a warning if users have overridden the internal config + if (config.logSegmentMinBytes() != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { + metadataLog.logger.error("Overriding " + MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG + + " is only supported for testing. Setting this value too low may lead to an inability to write batches of metadata records."); + } + + // When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower + // when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. + metadataLog.truncateToLatestSnapshot(); + + return metadataLog; + } + + public KafkaRaftLog( + UnifiedLog log, + Time time, + Scheduler scheduler, + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots, + TopicPartition topicPartition, + MetadataLogConfig config, + int nodeId) { + this.log = log; + this.time = time; + this.scheduler = scheduler; + this.snapshots = snapshots; + this.topicPartition = topicPartition; + this.config = config; + this.logIdent = "[MetadataLog partition=" + topicPartition + ", nodeId=" + nodeId + "] "; + this.logger = new LogContext(logIdent).logger(KafkaRaftLog.class); + } + + // for testing + UnifiedLog log() { + return log; + } + + @Override + public LogFetchInfo read(long startOffset, Isolation readIsolation) { + FetchIsolation isolation = switch (readIsolation) { + case COMMITTED -> FetchIsolation.HIGH_WATERMARK; + case UNCOMMITTED -> FetchIsolation.LOG_END; + }; + + try { + FetchDataInfo fetchInfo = log.read(startOffset, config.maxFetchSizeInBytes(), isolation, true); + return new LogFetchInfo( + fetchInfo.records, + new LogOffsetMetadata( + fetchInfo.fetchOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + fetchInfo.fetchOffsetMetadata.segmentBaseOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment)) + ) + ); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); Review Comment: Maybe it's a newcomer's question, is UncheckedIOException better than IOException? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java: ########## @@ -0,0 +1,843 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.snapshot.FileRawSnapshotReader; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.SnapshotPath; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class KafkaRaftLog implements ReplicatedLog { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRaftLog.class); + + private final Logger logger; + private final UnifiedLog log; + private final Time time; + private final Scheduler scheduler; + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + private final TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots; + private final TopicPartition topicPartition; + private final MetadataLogConfig config; + private final String logIdent; + + public static KafkaRaftLog createLog( + TopicPartition topicPartition, + Uuid topicId, + File dataDir, + Time time, + Scheduler scheduler, + MetadataLogConfig config, + int nodeId) throws IOException { + Properties props = new Properties(); + props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(config.maxBatchSizeInBytes())); + props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.logSegmentBytes())); + props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(config.logSegmentMillis())); + props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT)); + + // Disable time and byte retention when deleting segments + props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1"); + props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1"); + LogConfig.validate(props); + LogConfig defaultLogConfig = new LogConfig(props); + + if (config.logSegmentBytes() < config.logSegmentMinBytes()) { + throw new InvalidConfigurationException( + "Cannot set " + MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG + " below " + config.logSegmentMinBytes() + ": " + config.logSegmentBytes() + ); + } else if (defaultLogConfig.retentionMs >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_MS_CONFIG + " above -1: " + defaultLogConfig.retentionMs + ); + } else if (defaultLogConfig.retentionSize >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_BYTES_CONFIG + " above -1: " + defaultLogConfig.retentionSize + ); + } + + UnifiedLog log = UnifiedLog.create( + dataDir, + defaultLogConfig, + 0L, + 0L, + scheduler, + new BrokerTopicStats(), + time, + Integer.MAX_VALUE, + new ProducerStateManagerConfig(Integer.MAX_VALUE, false), + Integer.MAX_VALUE, + new LogDirFailureChannel(5), + false, + Optional.of(topicId) + ); + + KafkaRaftLog metadataLog = new KafkaRaftLog( + log, + time, + scheduler, + recoverSnapshots(log), + topicPartition, + config, + nodeId + ); + + // Print a warning if users have overridden the internal config + if (config.logSegmentMinBytes() != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { + metadataLog.logger.error("Overriding " + MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG + + " is only supported for testing. Setting this value too low may lead to an inability to write batches of metadata records."); + } + + // When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower + // when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. + metadataLog.truncateToLatestSnapshot(); + + return metadataLog; + } + + public KafkaRaftLog( + UnifiedLog log, + Time time, + Scheduler scheduler, + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots, + TopicPartition topicPartition, + MetadataLogConfig config, + int nodeId) { + this.log = log; + this.time = time; + this.scheduler = scheduler; + this.snapshots = snapshots; + this.topicPartition = topicPartition; + this.config = config; + this.logIdent = "[MetadataLog partition=" + topicPartition + ", nodeId=" + nodeId + "] "; + this.logger = new LogContext(logIdent).logger(KafkaRaftLog.class); + } + + // for testing + UnifiedLog log() { + return log; + } + + @Override + public LogFetchInfo read(long startOffset, Isolation readIsolation) { + FetchIsolation isolation = switch (readIsolation) { + case COMMITTED -> FetchIsolation.HIGH_WATERMARK; + case UNCOMMITTED -> FetchIsolation.LOG_END; + }; + + try { + FetchDataInfo fetchInfo = log.read(startOffset, config.maxFetchSizeInBytes(), isolation, true); + return new LogFetchInfo( + fetchInfo.records, + new LogOffsetMetadata( + fetchInfo.fetchOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + fetchInfo.fetchOffsetMetadata.segmentBaseOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment)) + ) + ); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogAppendInfo appendAsLeader(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + try { + return handleAndConvertLogAppendInfo(log.appendAsLeader((MemoryRecords) records, epoch, AppendOrigin.RAFT_LEADER)); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogAppendInfo appendAsFollower(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + return handleAndConvertLogAppendInfo(log.appendAsFollower((MemoryRecords) records, epoch)); + } + + private LogAppendInfo handleAndConvertLogAppendInfo(org.apache.kafka.storage.internals.log.LogAppendInfo appendInfo) { + if (appendInfo.firstOffset() == UnifiedLog.UNKNOWN_OFFSET) { + throw new CorruptRecordException("Append failed unexpectedly " + appendInfo); + } else { + return new LogAppendInfo(appendInfo.firstOffset(), appendInfo.lastOffset()); + } + } + + @Override + public int lastFetchedEpoch() { + Optional<Integer> latestEpoch = log.latestEpoch(); + return latestEpoch.orElseGet(() -> latestSnapshotId().map(snapshotId -> { + long logEndOffset = endOffset().offset(); + long startOffset = startOffset(); + if (snapshotId.offset() == startOffset && snapshotId.offset() == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + return snapshotId.epoch(); + } else { + throw new KafkaException( + "Log doesn't have a last fetch epoch and there is a snapshot (" + snapshotId + "). " + + "Expected the snapshot's end offset to match the log's end offset (" + logEndOffset + + ") and the log start offset (" + startOffset + ")" + ); + } + }).orElse(0)); + } + + @Override + public OffsetAndEpoch endOffsetForEpoch(int epoch) { + Optional<OffsetAndEpoch> endOffsetEpochOpt = log.endOffsetForEpoch(epoch); + Optional<OffsetAndEpoch> earliestSnapshotIdOpt = earliestSnapshotId(); + if (endOffsetEpochOpt.isPresent()) { + OffsetAndEpoch endOffsetEpoch = endOffsetEpochOpt.get(); + if (earliestSnapshotIdOpt.isPresent()) { + OffsetAndEpoch earliestSnapshotId = earliestSnapshotIdOpt.get(); + if (endOffsetEpoch.offset() == earliestSnapshotId.offset() && endOffsetEpoch.epoch() == epoch) { + // The epoch is smaller than the smallest epoch on the log. Override the diverging + // epoch to the oldest snapshot which should be the snapshot at the log start offset + return new OffsetAndEpoch(earliestSnapshotId.offset(), earliestSnapshotId.epoch()); + } + } + return new OffsetAndEpoch(endOffsetEpoch.offset(), endOffsetEpoch.epoch()); + } else { + return new OffsetAndEpoch(endOffset().offset(), lastFetchedEpoch()); + } + } + + @Override + public LogOffsetMetadata endOffset() { + org.apache.kafka.storage.internals.log.LogOffsetMetadata endOffsetMetadata = log.logEndOffsetMetadata(); + return new LogOffsetMetadata( + endOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + endOffsetMetadata.segmentBaseOffset, + endOffsetMetadata.relativePositionInSegment) + ) + ); + } + + @Override + public long startOffset() { + return log.logStartOffset(); + } + + @Override + public void truncateTo(long offset) { + long highWatermarkOffset = highWatermark().offset(); + if (offset < highWatermarkOffset) { + throw new IllegalArgumentException("Attempt to truncate to offset " + offset + + ", which is below the current high watermark " + highWatermarkOffset); + } + log.truncateTo(offset); + } + + @Override + public boolean truncateToLatestSnapshot() { + int latestEpoch = log.latestEpoch().orElse(0); + boolean truncated = false; + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> forgottenSnapshots = new TreeMap<>(); + Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId(); + if (snapshotIdOpt.isPresent()) { + OffsetAndEpoch snapshotId = snapshotIdOpt.get(); + if (snapshotId.epoch() > latestEpoch || (snapshotId.epoch() == latestEpoch && snapshotId.offset() > endOffset().offset())) { + // Truncate the log fully if the latest snapshot is greater than the log end offset + log.truncateFullyAndStartAt(snapshotId.offset(), Optional.empty()); + + // Forget snapshots less than the log start offset + synchronized (snapshots) { + truncated = true; + forgottenSnapshots = forgetSnapshotsBefore(snapshotId); + } + } + } + removeSnapshots(forgottenSnapshots, new FullTruncation()); + return truncated; + } + + @Override + public void initializeLeaderEpoch(int epoch) { + log.assignEpochStartOffset(epoch, log.logEndOffset()); + } + + @Override + public void updateHighWatermark(LogOffsetMetadata logOffsetMetadata) { + // This API returns the new high watermark, which may be different from the passed offset + Optional<OffsetMetadata> metadata = logOffsetMetadata.metadata(); + try { + long logHighWatermark; + if (metadata.isPresent() && metadata.get() instanceof SegmentPosition segmentPosition) { + logHighWatermark = log.updateHighWatermark( + new org.apache.kafka.storage.internals.log.LogOffsetMetadata( + logOffsetMetadata.offset(), + segmentPosition.baseOffset(), + segmentPosition.relativePosition() + ) + ); + } else { + logHighWatermark = log.updateHighWatermark(logOffsetMetadata.offset()); + } + + // Temporary log message until we fix KAFKA-14825 + if (logHighWatermark != logOffsetMetadata.offset()) { + logger.warn("Log's high watermark ({}) is different from the local replica's high watermark ({})", metadata, logOffsetMetadata); + } + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogOffsetMetadata highWatermark() { + try { + org.apache.kafka.storage.internals.log.LogOffsetMetadata hwm = log.fetchOffsetSnapshot().highWatermark; + Optional<OffsetMetadata> segmentPosition = !hwm.messageOffsetOnly() + ? Optional.of(new SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment)) + : Optional.empty(); + + return new LogOffsetMetadata(hwm.messageOffset, segmentPosition); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public void flush(boolean forceFlushActiveSegment) { + log.flush(forceFlushActiveSegment); + } + + /** + * Return the topic partition associated with the log. + */ + @Override + public TopicPartition topicPartition() { + return topicPartition; + } + + /** + * Return the topic ID associated with the log. + */ + @Override + public Uuid topicId() { + return log.topicId().get(); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId) { + long startOffset = startOffset(); + if (snapshotId.offset() < startOffset) { + logger.info("Cannot create a snapshot with an id ({}) less than the log start offset ({})", snapshotId, startOffset); + return Optional.empty(); + } + + long highWatermarkOffset = highWatermark().offset(); + if (snapshotId.offset() > highWatermarkOffset) { + throw new IllegalArgumentException( + "Cannot create a snapshot with an id (" + snapshotId + ") greater than the high-watermark (" + highWatermarkOffset + ")" + ); + } + + ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset(), snapshotId.epoch()); + if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) { + throw new IllegalArgumentException( + "Snapshot id (" + snapshotId + ") is not valid according to the log: " + validOffsetAndEpoch + ); + } + + /* + Perform a check that the requested snapshot offset is batch aligned via a log read, which + returns the base offset of the batch that contains the requested offset. A snapshot offset + is one greater than the last offset contained in the snapshot, and cannot go past the high + watermark. + + This check is necessary because Raft replication code assumes the snapshot offset is the + start of a batch. If a follower applies a non-batch aligned snapshot at offset (X) and + fetches from this offset, the returned batch will start at offset (X - M), and the + follower will be unable to append it since (X - M) < (X). + */ + long baseOffset = read(snapshotId.offset(), Isolation.COMMITTED).startOffsetMetadata.offset(); + if (snapshotId.offset() != baseOffset) { + throw new IllegalArgumentException( + "Cannot create snapshot at offset (" + snapshotId.offset() + ") because it is not batch aligned. " + + "The batch containing the requested offset has a base offset of (" + baseOffset + ")" + ); + } + return createNewSnapshotUnchecked(snapshotId); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshotUnchecked(OffsetAndEpoch snapshotId) { + boolean containsSnapshotId; + synchronized (snapshots) { + containsSnapshotId = snapshots.containsKey(snapshotId); + } + + if (containsSnapshotId) { + return Optional.empty(); + } else { + return Optional.of( + new NotifyingRawSnapshotWriter( + FileRawSnapshotWriter.create(log.dir().toPath(), snapshotId), + this::onSnapshotFrozen + ) + ); + } + } + + @Override + public Optional<RawSnapshotReader> readSnapshot(OffsetAndEpoch snapshotId) { + synchronized (snapshots) { + Optional<FileRawSnapshotReader> reader = snapshots.get(snapshotId); + if (reader == null) { + return Optional.empty(); + } else if (reader.isPresent()) { + return Optional.of(reader.get()); + } else { + // Snapshot exists but has never been read before + try { + FileRawSnapshotReader fileReader = FileRawSnapshotReader.open(log.dir().toPath(), snapshotId); + snapshots.put(snapshotId, Optional.of(fileReader)); + return Optional.of(fileReader); + } catch (UncheckedIOException e) { + // Snapshot doesn't exist in the data dir; remove + Path path = Snapshots.snapshotPath(log.dir().toPath(), snapshotId); + logger.warn("Couldn't read {}; expected to find snapshot file {}", snapshotId, path); + snapshots.remove(snapshotId); + return Optional.empty(); + } + } + } + } + + @Override + public Optional<RawSnapshotReader> latestSnapshot() { + synchronized (snapshots) { + return latestSnapshotId().flatMap(this::readSnapshot); + } + } + + @Override + public Optional<OffsetAndEpoch> latestSnapshotId() { + synchronized (snapshots) { + try { + OffsetAndEpoch epoch = snapshots.lastKey(); + return epoch == null ? Optional.empty() : Optional.of(epoch); + } catch (NoSuchElementException e) { + return Optional.empty(); + } + } + } + + @Override + public Optional<OffsetAndEpoch> earliestSnapshotId() { + synchronized (snapshots) { + try { + OffsetAndEpoch epoch = snapshots.firstKey(); + return epoch == null ? Optional.empty() : Optional.of(epoch); + } catch (NoSuchElementException e) { + return Optional.empty(); + } + } + } + + @Override + public void onSnapshotFrozen(OffsetAndEpoch snapshotId) { + synchronized (snapshots) { + snapshots.put(snapshotId, Optional.empty()); + } + } + + /** + * Delete snapshots that come before a given snapshot ID. This is done by advancing the log start offset to the given + * snapshot and cleaning old log segments. + * This will only happen if the following invariants all hold true: + * + * <li>The given snapshot precedes the latest snapshot</li> + * <li>The offset of the given snapshot is greater than the log start offset</li> + * <li>The log layer can advance the offset to the given snapshot</li> + * + * This method is thread-safe + */ + @Override + public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { + try { + return deleteBeforeSnapshot(snapshotId, new UnknownReason()); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + private boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId, SnapshotDeletionReason reason) throws IOException { + boolean deleted = false; + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> forgottenSnapshots = new TreeMap<>(); + synchronized (snapshots) { + Optional<OffsetAndEpoch> latestSnapshotIdOpt = latestSnapshotId(); + if (latestSnapshotIdOpt.isPresent()) { + OffsetAndEpoch latestSnapshotId = latestSnapshotIdOpt.get(); + if (snapshots.containsKey(snapshotId) && + startOffset() < snapshotId.offset() && + snapshotId.offset() <= latestSnapshotId.offset() && + log.maybeIncrementLogStartOffset(snapshotId.offset(), LogStartOffsetIncrementReason.SnapshotGenerated)) { + // Delete all segments that have a "last offset" less than the log start offset + int deletedSegments = log.deleteOldSegments(); + // Remove older snapshots from the snapshots cache + forgottenSnapshots = forgetSnapshotsBefore(snapshotId); + deleted = deletedSegments != 0 || !forgottenSnapshots.isEmpty(); + } + } + removeSnapshots(forgottenSnapshots, reason); Review Comment: Why did we move this inside of synchronized block? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java: ########## @@ -0,0 +1,843 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.snapshot.FileRawSnapshotReader; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.SnapshotPath; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class KafkaRaftLog implements ReplicatedLog { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRaftLog.class); + + private final Logger logger; + private final UnifiedLog log; + private final Time time; + private final Scheduler scheduler; + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + private final TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots; + private final TopicPartition topicPartition; + private final MetadataLogConfig config; + private final String logIdent; + + public static KafkaRaftLog createLog( + TopicPartition topicPartition, + Uuid topicId, + File dataDir, + Time time, + Scheduler scheduler, + MetadataLogConfig config, + int nodeId) throws IOException { + Properties props = new Properties(); + props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(config.maxBatchSizeInBytes())); + props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.logSegmentBytes())); + props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(config.logSegmentMillis())); + props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT)); + + // Disable time and byte retention when deleting segments + props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1"); + props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1"); + LogConfig.validate(props); + LogConfig defaultLogConfig = new LogConfig(props); + + if (config.logSegmentBytes() < config.logSegmentMinBytes()) { + throw new InvalidConfigurationException( + "Cannot set " + MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG + " below " + config.logSegmentMinBytes() + ": " + config.logSegmentBytes() + ); + } else if (defaultLogConfig.retentionMs >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_MS_CONFIG + " above -1: " + defaultLogConfig.retentionMs + ); + } else if (defaultLogConfig.retentionSize >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_BYTES_CONFIG + " above -1: " + defaultLogConfig.retentionSize + ); + } + + UnifiedLog log = UnifiedLog.create( + dataDir, + defaultLogConfig, + 0L, + 0L, + scheduler, + new BrokerTopicStats(), + time, + Integer.MAX_VALUE, + new ProducerStateManagerConfig(Integer.MAX_VALUE, false), + Integer.MAX_VALUE, + new LogDirFailureChannel(5), + false, + Optional.of(topicId) + ); + + KafkaRaftLog metadataLog = new KafkaRaftLog( + log, + time, + scheduler, + recoverSnapshots(log), + topicPartition, + config, + nodeId + ); + + // Print a warning if users have overridden the internal config + if (config.logSegmentMinBytes() != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { + metadataLog.logger.error("Overriding " + MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG + + " is only supported for testing. Setting this value too low may lead to an inability to write batches of metadata records."); + } + + // When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower + // when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. + metadataLog.truncateToLatestSnapshot(); + + return metadataLog; + } + + public KafkaRaftLog( + UnifiedLog log, + Time time, + Scheduler scheduler, + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots, + TopicPartition topicPartition, + MetadataLogConfig config, + int nodeId) { + this.log = log; + this.time = time; + this.scheduler = scheduler; + this.snapshots = snapshots; + this.topicPartition = topicPartition; + this.config = config; + this.logIdent = "[MetadataLog partition=" + topicPartition + ", nodeId=" + nodeId + "] "; + this.logger = new LogContext(logIdent).logger(KafkaRaftLog.class); + } + + // for testing + UnifiedLog log() { + return log; + } + + @Override + public LogFetchInfo read(long startOffset, Isolation readIsolation) { + FetchIsolation isolation = switch (readIsolation) { + case COMMITTED -> FetchIsolation.HIGH_WATERMARK; + case UNCOMMITTED -> FetchIsolation.LOG_END; + }; + + try { + FetchDataInfo fetchInfo = log.read(startOffset, config.maxFetchSizeInBytes(), isolation, true); + return new LogFetchInfo( + fetchInfo.records, + new LogOffsetMetadata( + fetchInfo.fetchOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + fetchInfo.fetchOffsetMetadata.segmentBaseOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment)) + ) + ); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogAppendInfo appendAsLeader(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + try { + return handleAndConvertLogAppendInfo(log.appendAsLeader((MemoryRecords) records, epoch, AppendOrigin.RAFT_LEADER)); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogAppendInfo appendAsFollower(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + return handleAndConvertLogAppendInfo(log.appendAsFollower((MemoryRecords) records, epoch)); + } + + private LogAppendInfo handleAndConvertLogAppendInfo(org.apache.kafka.storage.internals.log.LogAppendInfo appendInfo) { + if (appendInfo.firstOffset() == UnifiedLog.UNKNOWN_OFFSET) { + throw new CorruptRecordException("Append failed unexpectedly " + appendInfo); + } else { + return new LogAppendInfo(appendInfo.firstOffset(), appendInfo.lastOffset()); + } + } + + @Override + public int lastFetchedEpoch() { + Optional<Integer> latestEpoch = log.latestEpoch(); + return latestEpoch.orElseGet(() -> latestSnapshotId().map(snapshotId -> { + long logEndOffset = endOffset().offset(); + long startOffset = startOffset(); + if (snapshotId.offset() == startOffset && snapshotId.offset() == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + return snapshotId.epoch(); + } else { + throw new KafkaException( + "Log doesn't have a last fetch epoch and there is a snapshot (" + snapshotId + "). " + + "Expected the snapshot's end offset to match the log's end offset (" + logEndOffset + + ") and the log start offset (" + startOffset + ")" + ); + } + }).orElse(0)); + } + + @Override + public OffsetAndEpoch endOffsetForEpoch(int epoch) { + Optional<OffsetAndEpoch> endOffsetEpochOpt = log.endOffsetForEpoch(epoch); + Optional<OffsetAndEpoch> earliestSnapshotIdOpt = earliestSnapshotId(); + if (endOffsetEpochOpt.isPresent()) { + OffsetAndEpoch endOffsetEpoch = endOffsetEpochOpt.get(); + if (earliestSnapshotIdOpt.isPresent()) { + OffsetAndEpoch earliestSnapshotId = earliestSnapshotIdOpt.get(); + if (endOffsetEpoch.offset() == earliestSnapshotId.offset() && endOffsetEpoch.epoch() == epoch) { + // The epoch is smaller than the smallest epoch on the log. Override the diverging + // epoch to the oldest snapshot which should be the snapshot at the log start offset + return new OffsetAndEpoch(earliestSnapshotId.offset(), earliestSnapshotId.epoch()); + } + } + return new OffsetAndEpoch(endOffsetEpoch.offset(), endOffsetEpoch.epoch()); + } else { + return new OffsetAndEpoch(endOffset().offset(), lastFetchedEpoch()); + } + } + + @Override + public LogOffsetMetadata endOffset() { + org.apache.kafka.storage.internals.log.LogOffsetMetadata endOffsetMetadata = log.logEndOffsetMetadata(); + return new LogOffsetMetadata( + endOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + endOffsetMetadata.segmentBaseOffset, + endOffsetMetadata.relativePositionInSegment) + ) + ); + } + + @Override + public long startOffset() { + return log.logStartOffset(); + } + + @Override + public void truncateTo(long offset) { + long highWatermarkOffset = highWatermark().offset(); + if (offset < highWatermarkOffset) { + throw new IllegalArgumentException("Attempt to truncate to offset " + offset + + ", which is below the current high watermark " + highWatermarkOffset); + } + log.truncateTo(offset); + } + + @Override + public boolean truncateToLatestSnapshot() { + int latestEpoch = log.latestEpoch().orElse(0); + boolean truncated = false; + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> forgottenSnapshots = new TreeMap<>(); + Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId(); + if (snapshotIdOpt.isPresent()) { + OffsetAndEpoch snapshotId = snapshotIdOpt.get(); + if (snapshotId.epoch() > latestEpoch || (snapshotId.epoch() == latestEpoch && snapshotId.offset() > endOffset().offset())) { + // Truncate the log fully if the latest snapshot is greater than the log end offset + log.truncateFullyAndStartAt(snapshotId.offset(), Optional.empty()); + + // Forget snapshots less than the log start offset + synchronized (snapshots) { + truncated = true; + forgottenSnapshots = forgetSnapshotsBefore(snapshotId); + } + } + } + removeSnapshots(forgottenSnapshots, new FullTruncation()); + return truncated; + } + + @Override + public void initializeLeaderEpoch(int epoch) { + log.assignEpochStartOffset(epoch, log.logEndOffset()); + } + + @Override + public void updateHighWatermark(LogOffsetMetadata logOffsetMetadata) { + // This API returns the new high watermark, which may be different from the passed offset + Optional<OffsetMetadata> metadata = logOffsetMetadata.metadata(); + try { + long logHighWatermark; + if (metadata.isPresent() && metadata.get() instanceof SegmentPosition segmentPosition) { + logHighWatermark = log.updateHighWatermark( + new org.apache.kafka.storage.internals.log.LogOffsetMetadata( + logOffsetMetadata.offset(), + segmentPosition.baseOffset(), + segmentPosition.relativePosition() + ) + ); + } else { + logHighWatermark = log.updateHighWatermark(logOffsetMetadata.offset()); + } + + // Temporary log message until we fix KAFKA-14825 + if (logHighWatermark != logOffsetMetadata.offset()) { + logger.warn("Log's high watermark ({}) is different from the local replica's high watermark ({})", metadata, logOffsetMetadata); + } + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogOffsetMetadata highWatermark() { + try { + org.apache.kafka.storage.internals.log.LogOffsetMetadata hwm = log.fetchOffsetSnapshot().highWatermark; + Optional<OffsetMetadata> segmentPosition = !hwm.messageOffsetOnly() + ? Optional.of(new SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment)) + : Optional.empty(); + + return new LogOffsetMetadata(hwm.messageOffset, segmentPosition); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public void flush(boolean forceFlushActiveSegment) { + log.flush(forceFlushActiveSegment); + } + + /** + * Return the topic partition associated with the log. + */ + @Override + public TopicPartition topicPartition() { + return topicPartition; + } + + /** + * Return the topic ID associated with the log. + */ + @Override + public Uuid topicId() { + return log.topicId().get(); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId) { + long startOffset = startOffset(); + if (snapshotId.offset() < startOffset) { + logger.info("Cannot create a snapshot with an id ({}) less than the log start offset ({})", snapshotId, startOffset); + return Optional.empty(); + } + + long highWatermarkOffset = highWatermark().offset(); + if (snapshotId.offset() > highWatermarkOffset) { + throw new IllegalArgumentException( + "Cannot create a snapshot with an id (" + snapshotId + ") greater than the high-watermark (" + highWatermarkOffset + ")" + ); + } + + ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset(), snapshotId.epoch()); + if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) { + throw new IllegalArgumentException( + "Snapshot id (" + snapshotId + ") is not valid according to the log: " + validOffsetAndEpoch + ); + } + + /* + Perform a check that the requested snapshot offset is batch aligned via a log read, which + returns the base offset of the batch that contains the requested offset. A snapshot offset + is one greater than the last offset contained in the snapshot, and cannot go past the high + watermark. + + This check is necessary because Raft replication code assumes the snapshot offset is the + start of a batch. If a follower applies a non-batch aligned snapshot at offset (X) and + fetches from this offset, the returned batch will start at offset (X - M), and the + follower will be unable to append it since (X - M) < (X). + */ + long baseOffset = read(snapshotId.offset(), Isolation.COMMITTED).startOffsetMetadata.offset(); + if (snapshotId.offset() != baseOffset) { + throw new IllegalArgumentException( + "Cannot create snapshot at offset (" + snapshotId.offset() + ") because it is not batch aligned. " + + "The batch containing the requested offset has a base offset of (" + baseOffset + ")" + ); + } + return createNewSnapshotUnchecked(snapshotId); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshotUnchecked(OffsetAndEpoch snapshotId) { + boolean containsSnapshotId; + synchronized (snapshots) { + containsSnapshotId = snapshots.containsKey(snapshotId); + } + + if (containsSnapshotId) { + return Optional.empty(); + } else { + return Optional.of( + new NotifyingRawSnapshotWriter( + FileRawSnapshotWriter.create(log.dir().toPath(), snapshotId), + this::onSnapshotFrozen + ) + ); + } + } + + @Override + public Optional<RawSnapshotReader> readSnapshot(OffsetAndEpoch snapshotId) { + synchronized (snapshots) { + Optional<FileRawSnapshotReader> reader = snapshots.get(snapshotId); + if (reader == null) { + return Optional.empty(); + } else if (reader.isPresent()) { + return Optional.of(reader.get()); + } else { + // Snapshot exists but has never been read before + try { + FileRawSnapshotReader fileReader = FileRawSnapshotReader.open(log.dir().toPath(), snapshotId); + snapshots.put(snapshotId, Optional.of(fileReader)); + return Optional.of(fileReader); + } catch (UncheckedIOException e) { + // Snapshot doesn't exist in the data dir; remove + Path path = Snapshots.snapshotPath(log.dir().toPath(), snapshotId); + logger.warn("Couldn't read {}; expected to find snapshot file {}", snapshotId, path); + snapshots.remove(snapshotId); + return Optional.empty(); + } + } + } + } + + @Override + public Optional<RawSnapshotReader> latestSnapshot() { + synchronized (snapshots) { + return latestSnapshotId().flatMap(this::readSnapshot); + } + } + + @Override + public Optional<OffsetAndEpoch> latestSnapshotId() { + synchronized (snapshots) { + try { + OffsetAndEpoch epoch = snapshots.lastKey(); + return epoch == null ? Optional.empty() : Optional.of(epoch); + } catch (NoSuchElementException e) { Review Comment: How about changing this NoSuchElementException catching code to `if snapshots.isEmpty()` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java: ########## @@ -0,0 +1,843 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.snapshot.FileRawSnapshotReader; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.SnapshotPath; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class KafkaRaftLog implements ReplicatedLog { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRaftLog.class); + + private final Logger logger; + private final UnifiedLog log; + private final Time time; + private final Scheduler scheduler; + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + private final TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots; + private final TopicPartition topicPartition; + private final MetadataLogConfig config; + private final String logIdent; + + public static KafkaRaftLog createLog( + TopicPartition topicPartition, + Uuid topicId, + File dataDir, + Time time, + Scheduler scheduler, + MetadataLogConfig config, + int nodeId) throws IOException { + Properties props = new Properties(); + props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(config.maxBatchSizeInBytes())); + props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.logSegmentBytes())); + props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(config.logSegmentMillis())); + props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT)); + + // Disable time and byte retention when deleting segments + props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1"); + props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1"); + LogConfig.validate(props); + LogConfig defaultLogConfig = new LogConfig(props); + + if (config.logSegmentBytes() < config.logSegmentMinBytes()) { + throw new InvalidConfigurationException( + "Cannot set " + MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG + " below " + config.logSegmentMinBytes() + ": " + config.logSegmentBytes() + ); + } else if (defaultLogConfig.retentionMs >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_MS_CONFIG + " above -1: " + defaultLogConfig.retentionMs + ); + } else if (defaultLogConfig.retentionSize >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_BYTES_CONFIG + " above -1: " + defaultLogConfig.retentionSize + ); + } + + UnifiedLog log = UnifiedLog.create( + dataDir, + defaultLogConfig, + 0L, + 0L, + scheduler, + new BrokerTopicStats(), + time, + Integer.MAX_VALUE, + new ProducerStateManagerConfig(Integer.MAX_VALUE, false), + Integer.MAX_VALUE, + new LogDirFailureChannel(5), + false, + Optional.of(topicId) + ); + + KafkaRaftLog metadataLog = new KafkaRaftLog( + log, + time, + scheduler, + recoverSnapshots(log), + topicPartition, + config, + nodeId + ); + + // Print a warning if users have overridden the internal config + if (config.logSegmentMinBytes() != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { + metadataLog.logger.error("Overriding " + MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG + + " is only supported for testing. Setting this value too low may lead to an inability to write batches of metadata records."); + } + + // When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower + // when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. + metadataLog.truncateToLatestSnapshot(); + + return metadataLog; + } + + public KafkaRaftLog( + UnifiedLog log, + Time time, + Scheduler scheduler, + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots, + TopicPartition topicPartition, + MetadataLogConfig config, + int nodeId) { + this.log = log; + this.time = time; + this.scheduler = scheduler; + this.snapshots = snapshots; + this.topicPartition = topicPartition; + this.config = config; + this.logIdent = "[MetadataLog partition=" + topicPartition + ", nodeId=" + nodeId + "] "; + this.logger = new LogContext(logIdent).logger(KafkaRaftLog.class); + } + + // for testing + UnifiedLog log() { + return log; + } + + @Override + public LogFetchInfo read(long startOffset, Isolation readIsolation) { + FetchIsolation isolation = switch (readIsolation) { + case COMMITTED -> FetchIsolation.HIGH_WATERMARK; + case UNCOMMITTED -> FetchIsolation.LOG_END; + }; + + try { + FetchDataInfo fetchInfo = log.read(startOffset, config.maxFetchSizeInBytes(), isolation, true); + return new LogFetchInfo( + fetchInfo.records, + new LogOffsetMetadata( + fetchInfo.fetchOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + fetchInfo.fetchOffsetMetadata.segmentBaseOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment)) + ) + ); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogAppendInfo appendAsLeader(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + try { + return handleAndConvertLogAppendInfo(log.appendAsLeader((MemoryRecords) records, epoch, AppendOrigin.RAFT_LEADER)); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogAppendInfo appendAsFollower(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + return handleAndConvertLogAppendInfo(log.appendAsFollower((MemoryRecords) records, epoch)); + } + + private LogAppendInfo handleAndConvertLogAppendInfo(org.apache.kafka.storage.internals.log.LogAppendInfo appendInfo) { + if (appendInfo.firstOffset() == UnifiedLog.UNKNOWN_OFFSET) { + throw new CorruptRecordException("Append failed unexpectedly " + appendInfo); + } else { + return new LogAppendInfo(appendInfo.firstOffset(), appendInfo.lastOffset()); + } + } + + @Override + public int lastFetchedEpoch() { + Optional<Integer> latestEpoch = log.latestEpoch(); + return latestEpoch.orElseGet(() -> latestSnapshotId().map(snapshotId -> { + long logEndOffset = endOffset().offset(); + long startOffset = startOffset(); + if (snapshotId.offset() == startOffset && snapshotId.offset() == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + return snapshotId.epoch(); + } else { + throw new KafkaException( + "Log doesn't have a last fetch epoch and there is a snapshot (" + snapshotId + "). " + + "Expected the snapshot's end offset to match the log's end offset (" + logEndOffset + + ") and the log start offset (" + startOffset + ")" + ); + } + }).orElse(0)); + } + + @Override + public OffsetAndEpoch endOffsetForEpoch(int epoch) { + Optional<OffsetAndEpoch> endOffsetEpochOpt = log.endOffsetForEpoch(epoch); + Optional<OffsetAndEpoch> earliestSnapshotIdOpt = earliestSnapshotId(); + if (endOffsetEpochOpt.isPresent()) { + OffsetAndEpoch endOffsetEpoch = endOffsetEpochOpt.get(); + if (earliestSnapshotIdOpt.isPresent()) { + OffsetAndEpoch earliestSnapshotId = earliestSnapshotIdOpt.get(); + if (endOffsetEpoch.offset() == earliestSnapshotId.offset() && endOffsetEpoch.epoch() == epoch) { + // The epoch is smaller than the smallest epoch on the log. Override the diverging + // epoch to the oldest snapshot which should be the snapshot at the log start offset + return new OffsetAndEpoch(earliestSnapshotId.offset(), earliestSnapshotId.epoch()); + } + } + return new OffsetAndEpoch(endOffsetEpoch.offset(), endOffsetEpoch.epoch()); + } else { + return new OffsetAndEpoch(endOffset().offset(), lastFetchedEpoch()); + } + } + + @Override + public LogOffsetMetadata endOffset() { + org.apache.kafka.storage.internals.log.LogOffsetMetadata endOffsetMetadata = log.logEndOffsetMetadata(); + return new LogOffsetMetadata( + endOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + endOffsetMetadata.segmentBaseOffset, + endOffsetMetadata.relativePositionInSegment) + ) + ); + } + + @Override + public long startOffset() { + return log.logStartOffset(); + } + + @Override + public void truncateTo(long offset) { + long highWatermarkOffset = highWatermark().offset(); + if (offset < highWatermarkOffset) { + throw new IllegalArgumentException("Attempt to truncate to offset " + offset + + ", which is below the current high watermark " + highWatermarkOffset); + } + log.truncateTo(offset); + } + + @Override + public boolean truncateToLatestSnapshot() { + int latestEpoch = log.latestEpoch().orElse(0); + boolean truncated = false; + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> forgottenSnapshots = new TreeMap<>(); + Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId(); + if (snapshotIdOpt.isPresent()) { + OffsetAndEpoch snapshotId = snapshotIdOpt.get(); + if (snapshotId.epoch() > latestEpoch || (snapshotId.epoch() == latestEpoch && snapshotId.offset() > endOffset().offset())) { + // Truncate the log fully if the latest snapshot is greater than the log end offset + log.truncateFullyAndStartAt(snapshotId.offset(), Optional.empty()); + + // Forget snapshots less than the log start offset + synchronized (snapshots) { + truncated = true; + forgottenSnapshots = forgetSnapshotsBefore(snapshotId); + } + } + } + removeSnapshots(forgottenSnapshots, new FullTruncation()); + return truncated; + } + + @Override + public void initializeLeaderEpoch(int epoch) { + log.assignEpochStartOffset(epoch, log.logEndOffset()); + } + + @Override + public void updateHighWatermark(LogOffsetMetadata logOffsetMetadata) { + // This API returns the new high watermark, which may be different from the passed offset + Optional<OffsetMetadata> metadata = logOffsetMetadata.metadata(); + try { + long logHighWatermark; + if (metadata.isPresent() && metadata.get() instanceof SegmentPosition segmentPosition) { + logHighWatermark = log.updateHighWatermark( + new org.apache.kafka.storage.internals.log.LogOffsetMetadata( + logOffsetMetadata.offset(), + segmentPosition.baseOffset(), + segmentPosition.relativePosition() + ) + ); + } else { + logHighWatermark = log.updateHighWatermark(logOffsetMetadata.offset()); + } + + // Temporary log message until we fix KAFKA-14825 + if (logHighWatermark != logOffsetMetadata.offset()) { + logger.warn("Log's high watermark ({}) is different from the local replica's high watermark ({})", metadata, logOffsetMetadata); + } + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogOffsetMetadata highWatermark() { + try { + org.apache.kafka.storage.internals.log.LogOffsetMetadata hwm = log.fetchOffsetSnapshot().highWatermark; + Optional<OffsetMetadata> segmentPosition = !hwm.messageOffsetOnly() + ? Optional.of(new SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment)) + : Optional.empty(); + + return new LogOffsetMetadata(hwm.messageOffset, segmentPosition); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public void flush(boolean forceFlushActiveSegment) { + log.flush(forceFlushActiveSegment); + } + + /** + * Return the topic partition associated with the log. + */ + @Override + public TopicPartition topicPartition() { + return topicPartition; + } + + /** + * Return the topic ID associated with the log. + */ + @Override + public Uuid topicId() { + return log.topicId().get(); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId) { + long startOffset = startOffset(); + if (snapshotId.offset() < startOffset) { + logger.info("Cannot create a snapshot with an id ({}) less than the log start offset ({})", snapshotId, startOffset); + return Optional.empty(); + } + + long highWatermarkOffset = highWatermark().offset(); + if (snapshotId.offset() > highWatermarkOffset) { + throw new IllegalArgumentException( + "Cannot create a snapshot with an id (" + snapshotId + ") greater than the high-watermark (" + highWatermarkOffset + ")" + ); + } + + ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset(), snapshotId.epoch()); + if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) { + throw new IllegalArgumentException( + "Snapshot id (" + snapshotId + ") is not valid according to the log: " + validOffsetAndEpoch + ); + } + + /* + Perform a check that the requested snapshot offset is batch aligned via a log read, which + returns the base offset of the batch that contains the requested offset. A snapshot offset + is one greater than the last offset contained in the snapshot, and cannot go past the high + watermark. + + This check is necessary because Raft replication code assumes the snapshot offset is the + start of a batch. If a follower applies a non-batch aligned snapshot at offset (X) and + fetches from this offset, the returned batch will start at offset (X - M), and the + follower will be unable to append it since (X - M) < (X). + */ + long baseOffset = read(snapshotId.offset(), Isolation.COMMITTED).startOffsetMetadata.offset(); + if (snapshotId.offset() != baseOffset) { + throw new IllegalArgumentException( + "Cannot create snapshot at offset (" + snapshotId.offset() + ") because it is not batch aligned. " + + "The batch containing the requested offset has a base offset of (" + baseOffset + ")" + ); + } + return createNewSnapshotUnchecked(snapshotId); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshotUnchecked(OffsetAndEpoch snapshotId) { + boolean containsSnapshotId; + synchronized (snapshots) { + containsSnapshotId = snapshots.containsKey(snapshotId); + } + + if (containsSnapshotId) { + return Optional.empty(); + } else { + return Optional.of( + new NotifyingRawSnapshotWriter( + FileRawSnapshotWriter.create(log.dir().toPath(), snapshotId), + this::onSnapshotFrozen + ) + ); + } + } + + @Override + public Optional<RawSnapshotReader> readSnapshot(OffsetAndEpoch snapshotId) { + synchronized (snapshots) { + Optional<FileRawSnapshotReader> reader = snapshots.get(snapshotId); + if (reader == null) { + return Optional.empty(); + } else if (reader.isPresent()) { + return Optional.of(reader.get()); + } else { + // Snapshot exists but has never been read before + try { + FileRawSnapshotReader fileReader = FileRawSnapshotReader.open(log.dir().toPath(), snapshotId); + snapshots.put(snapshotId, Optional.of(fileReader)); + return Optional.of(fileReader); + } catch (UncheckedIOException e) { + // Snapshot doesn't exist in the data dir; remove + Path path = Snapshots.snapshotPath(log.dir().toPath(), snapshotId); + logger.warn("Couldn't read {}; expected to find snapshot file {}", snapshotId, path); + snapshots.remove(snapshotId); + return Optional.empty(); + } + } + } + } + + @Override + public Optional<RawSnapshotReader> latestSnapshot() { + synchronized (snapshots) { + return latestSnapshotId().flatMap(this::readSnapshot); + } + } + + @Override + public Optional<OffsetAndEpoch> latestSnapshotId() { + synchronized (snapshots) { + try { + OffsetAndEpoch epoch = snapshots.lastKey(); + return epoch == null ? Optional.empty() : Optional.of(epoch); + } catch (NoSuchElementException e) { + return Optional.empty(); + } + } + } + + @Override + public Optional<OffsetAndEpoch> earliestSnapshotId() { + synchronized (snapshots) { + try { + OffsetAndEpoch epoch = snapshots.firstKey(); + return epoch == null ? Optional.empty() : Optional.of(epoch); + } catch (NoSuchElementException e) { + return Optional.empty(); + } + } + } + + @Override + public void onSnapshotFrozen(OffsetAndEpoch snapshotId) { + synchronized (snapshots) { + snapshots.put(snapshotId, Optional.empty()); + } + } + + /** + * Delete snapshots that come before a given snapshot ID. This is done by advancing the log start offset to the given + * snapshot and cleaning old log segments. + * This will only happen if the following invariants all hold true: + * + * <li>The given snapshot precedes the latest snapshot</li> + * <li>The offset of the given snapshot is greater than the log start offset</li> + * <li>The log layer can advance the offset to the given snapshot</li> + * + * This method is thread-safe + */ + @Override + public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { + try { + return deleteBeforeSnapshot(snapshotId, new UnknownReason()); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + private boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId, SnapshotDeletionReason reason) throws IOException { + boolean deleted = false; + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> forgottenSnapshots = new TreeMap<>(); + synchronized (snapshots) { + Optional<OffsetAndEpoch> latestSnapshotIdOpt = latestSnapshotId(); + if (latestSnapshotIdOpt.isPresent()) { + OffsetAndEpoch latestSnapshotId = latestSnapshotIdOpt.get(); + if (snapshots.containsKey(snapshotId) && + startOffset() < snapshotId.offset() && + snapshotId.offset() <= latestSnapshotId.offset() && + log.maybeIncrementLogStartOffset(snapshotId.offset(), LogStartOffsetIncrementReason.SnapshotGenerated)) { + // Delete all segments that have a "last offset" less than the log start offset + int deletedSegments = log.deleteOldSegments(); + // Remove older snapshots from the snapshots cache + forgottenSnapshots = forgetSnapshotsBefore(snapshotId); + deleted = deletedSegments != 0 || !forgottenSnapshots.isEmpty(); + } + } + removeSnapshots(forgottenSnapshots, reason); + return deleted; + } + } + + /** + * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe + */ + private Map<OffsetAndEpoch, Long> loadSnapshotSizes() { + Map<OffsetAndEpoch, Long> snapshotSizes = new HashMap<>(); + for (OffsetAndEpoch key : snapshots.keySet()) { + Optional<RawSnapshotReader> snapshotReader = readSnapshot(key); + snapshotReader.ifPresent(fileRawSnapshotReader -> { + snapshots.put(key, Optional.of((FileRawSnapshotReader) snapshotReader.get())); Review Comment: Why should we put it here? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftLog.java: ########## @@ -0,0 +1,843 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.snapshot.FileRawSnapshotReader; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.apache.kafka.snapshot.SnapshotPath; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.UnifiedLog; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class KafkaRaftLog implements ReplicatedLog { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRaftLog.class); + + private final Logger logger; + private final UnifiedLog log; + private final Time time; + private final Scheduler scheduler; + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + private final TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots; + private final TopicPartition topicPartition; + private final MetadataLogConfig config; + private final String logIdent; + + public static KafkaRaftLog createLog( + TopicPartition topicPartition, + Uuid topicId, + File dataDir, + Time time, + Scheduler scheduler, + MetadataLogConfig config, + int nodeId) throws IOException { + Properties props = new Properties(); + props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(config.maxBatchSizeInBytes())); + props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.logSegmentBytes())); + props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(config.logSegmentMillis())); + props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT)); + + // Disable time and byte retention when deleting segments + props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1"); + props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1"); + LogConfig.validate(props); + LogConfig defaultLogConfig = new LogConfig(props); + + if (config.logSegmentBytes() < config.logSegmentMinBytes()) { + throw new InvalidConfigurationException( + "Cannot set " + MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG + " below " + config.logSegmentMinBytes() + ": " + config.logSegmentBytes() + ); + } else if (defaultLogConfig.retentionMs >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_MS_CONFIG + " above -1: " + defaultLogConfig.retentionMs + ); + } else if (defaultLogConfig.retentionSize >= 0) { + throw new InvalidConfigurationException( + "Cannot set " + TopicConfig.RETENTION_BYTES_CONFIG + " above -1: " + defaultLogConfig.retentionSize + ); + } + + UnifiedLog log = UnifiedLog.create( + dataDir, + defaultLogConfig, + 0L, + 0L, + scheduler, + new BrokerTopicStats(), + time, + Integer.MAX_VALUE, + new ProducerStateManagerConfig(Integer.MAX_VALUE, false), + Integer.MAX_VALUE, + new LogDirFailureChannel(5), + false, + Optional.of(topicId) + ); + + KafkaRaftLog metadataLog = new KafkaRaftLog( + log, + time, + scheduler, + recoverSnapshots(log), + topicPartition, + config, + nodeId + ); + + // Print a warning if users have overridden the internal config + if (config.logSegmentMinBytes() != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { + metadataLog.logger.error("Overriding " + MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG + + " is only supported for testing. Setting this value too low may lead to an inability to write batches of metadata records."); + } + + // When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower + // when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. + metadataLog.truncateToLatestSnapshot(); + + return metadataLog; + } + + public KafkaRaftLog( + UnifiedLog log, + Time time, + Scheduler scheduler, + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> snapshots, + TopicPartition topicPartition, + MetadataLogConfig config, + int nodeId) { + this.log = log; + this.time = time; + this.scheduler = scheduler; + this.snapshots = snapshots; + this.topicPartition = topicPartition; + this.config = config; + this.logIdent = "[MetadataLog partition=" + topicPartition + ", nodeId=" + nodeId + "] "; + this.logger = new LogContext(logIdent).logger(KafkaRaftLog.class); + } + + // for testing + UnifiedLog log() { + return log; + } + + @Override + public LogFetchInfo read(long startOffset, Isolation readIsolation) { + FetchIsolation isolation = switch (readIsolation) { + case COMMITTED -> FetchIsolation.HIGH_WATERMARK; + case UNCOMMITTED -> FetchIsolation.LOG_END; + }; + + try { + FetchDataInfo fetchInfo = log.read(startOffset, config.maxFetchSizeInBytes(), isolation, true); + return new LogFetchInfo( + fetchInfo.records, + new LogOffsetMetadata( + fetchInfo.fetchOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + fetchInfo.fetchOffsetMetadata.segmentBaseOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment)) + ) + ); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogAppendInfo appendAsLeader(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + try { + return handleAndConvertLogAppendInfo(log.appendAsLeader((MemoryRecords) records, epoch, AppendOrigin.RAFT_LEADER)); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogAppendInfo appendAsFollower(Records records, int epoch) { + if (records.sizeInBytes() == 0) { + throw new IllegalArgumentException("Attempt to append an empty record set"); + } + + return handleAndConvertLogAppendInfo(log.appendAsFollower((MemoryRecords) records, epoch)); + } + + private LogAppendInfo handleAndConvertLogAppendInfo(org.apache.kafka.storage.internals.log.LogAppendInfo appendInfo) { + if (appendInfo.firstOffset() == UnifiedLog.UNKNOWN_OFFSET) { + throw new CorruptRecordException("Append failed unexpectedly " + appendInfo); + } else { + return new LogAppendInfo(appendInfo.firstOffset(), appendInfo.lastOffset()); + } + } + + @Override + public int lastFetchedEpoch() { + Optional<Integer> latestEpoch = log.latestEpoch(); + return latestEpoch.orElseGet(() -> latestSnapshotId().map(snapshotId -> { + long logEndOffset = endOffset().offset(); + long startOffset = startOffset(); + if (snapshotId.offset() == startOffset && snapshotId.offset() == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + return snapshotId.epoch(); + } else { + throw new KafkaException( + "Log doesn't have a last fetch epoch and there is a snapshot (" + snapshotId + "). " + + "Expected the snapshot's end offset to match the log's end offset (" + logEndOffset + + ") and the log start offset (" + startOffset + ")" + ); + } + }).orElse(0)); + } + + @Override + public OffsetAndEpoch endOffsetForEpoch(int epoch) { + Optional<OffsetAndEpoch> endOffsetEpochOpt = log.endOffsetForEpoch(epoch); + Optional<OffsetAndEpoch> earliestSnapshotIdOpt = earliestSnapshotId(); + if (endOffsetEpochOpt.isPresent()) { + OffsetAndEpoch endOffsetEpoch = endOffsetEpochOpt.get(); + if (earliestSnapshotIdOpt.isPresent()) { + OffsetAndEpoch earliestSnapshotId = earliestSnapshotIdOpt.get(); + if (endOffsetEpoch.offset() == earliestSnapshotId.offset() && endOffsetEpoch.epoch() == epoch) { + // The epoch is smaller than the smallest epoch on the log. Override the diverging + // epoch to the oldest snapshot which should be the snapshot at the log start offset + return new OffsetAndEpoch(earliestSnapshotId.offset(), earliestSnapshotId.epoch()); + } + } + return new OffsetAndEpoch(endOffsetEpoch.offset(), endOffsetEpoch.epoch()); + } else { + return new OffsetAndEpoch(endOffset().offset(), lastFetchedEpoch()); + } + } + + @Override + public LogOffsetMetadata endOffset() { + org.apache.kafka.storage.internals.log.LogOffsetMetadata endOffsetMetadata = log.logEndOffsetMetadata(); + return new LogOffsetMetadata( + endOffsetMetadata.messageOffset, + Optional.of(new SegmentPosition( + endOffsetMetadata.segmentBaseOffset, + endOffsetMetadata.relativePositionInSegment) + ) + ); + } + + @Override + public long startOffset() { + return log.logStartOffset(); + } + + @Override + public void truncateTo(long offset) { + long highWatermarkOffset = highWatermark().offset(); + if (offset < highWatermarkOffset) { + throw new IllegalArgumentException("Attempt to truncate to offset " + offset + + ", which is below the current high watermark " + highWatermarkOffset); + } + log.truncateTo(offset); + } + + @Override + public boolean truncateToLatestSnapshot() { + int latestEpoch = log.latestEpoch().orElse(0); + boolean truncated = false; + TreeMap<OffsetAndEpoch, Optional<FileRawSnapshotReader>> forgottenSnapshots = new TreeMap<>(); + Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId(); + if (snapshotIdOpt.isPresent()) { + OffsetAndEpoch snapshotId = snapshotIdOpt.get(); + if (snapshotId.epoch() > latestEpoch || (snapshotId.epoch() == latestEpoch && snapshotId.offset() > endOffset().offset())) { + // Truncate the log fully if the latest snapshot is greater than the log end offset + log.truncateFullyAndStartAt(snapshotId.offset(), Optional.empty()); + + // Forget snapshots less than the log start offset + synchronized (snapshots) { + truncated = true; + forgottenSnapshots = forgetSnapshotsBefore(snapshotId); + } + } + } + removeSnapshots(forgottenSnapshots, new FullTruncation()); + return truncated; + } + + @Override + public void initializeLeaderEpoch(int epoch) { + log.assignEpochStartOffset(epoch, log.logEndOffset()); + } + + @Override + public void updateHighWatermark(LogOffsetMetadata logOffsetMetadata) { + // This API returns the new high watermark, which may be different from the passed offset + Optional<OffsetMetadata> metadata = logOffsetMetadata.metadata(); + try { + long logHighWatermark; + if (metadata.isPresent() && metadata.get() instanceof SegmentPosition segmentPosition) { + logHighWatermark = log.updateHighWatermark( + new org.apache.kafka.storage.internals.log.LogOffsetMetadata( + logOffsetMetadata.offset(), + segmentPosition.baseOffset(), + segmentPosition.relativePosition() + ) + ); + } else { + logHighWatermark = log.updateHighWatermark(logOffsetMetadata.offset()); + } + + // Temporary log message until we fix KAFKA-14825 + if (logHighWatermark != logOffsetMetadata.offset()) { + logger.warn("Log's high watermark ({}) is different from the local replica's high watermark ({})", metadata, logOffsetMetadata); + } + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public LogOffsetMetadata highWatermark() { + try { + org.apache.kafka.storage.internals.log.LogOffsetMetadata hwm = log.fetchOffsetSnapshot().highWatermark; + Optional<OffsetMetadata> segmentPosition = !hwm.messageOffsetOnly() + ? Optional.of(new SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment)) + : Optional.empty(); + + return new LogOffsetMetadata(hwm.messageOffset, segmentPosition); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public void flush(boolean forceFlushActiveSegment) { + log.flush(forceFlushActiveSegment); + } + + /** + * Return the topic partition associated with the log. + */ + @Override + public TopicPartition topicPartition() { + return topicPartition; + } + + /** + * Return the topic ID associated with the log. + */ + @Override + public Uuid topicId() { + return log.topicId().get(); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId) { + long startOffset = startOffset(); + if (snapshotId.offset() < startOffset) { + logger.info("Cannot create a snapshot with an id ({}) less than the log start offset ({})", snapshotId, startOffset); + return Optional.empty(); + } + + long highWatermarkOffset = highWatermark().offset(); + if (snapshotId.offset() > highWatermarkOffset) { + throw new IllegalArgumentException( + "Cannot create a snapshot with an id (" + snapshotId + ") greater than the high-watermark (" + highWatermarkOffset + ")" + ); + } + + ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset(), snapshotId.epoch()); + if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) { + throw new IllegalArgumentException( + "Snapshot id (" + snapshotId + ") is not valid according to the log: " + validOffsetAndEpoch + ); + } + + /* + Perform a check that the requested snapshot offset is batch aligned via a log read, which + returns the base offset of the batch that contains the requested offset. A snapshot offset + is one greater than the last offset contained in the snapshot, and cannot go past the high + watermark. + + This check is necessary because Raft replication code assumes the snapshot offset is the + start of a batch. If a follower applies a non-batch aligned snapshot at offset (X) and + fetches from this offset, the returned batch will start at offset (X - M), and the + follower will be unable to append it since (X - M) < (X). + */ + long baseOffset = read(snapshotId.offset(), Isolation.COMMITTED).startOffsetMetadata.offset(); + if (snapshotId.offset() != baseOffset) { + throw new IllegalArgumentException( + "Cannot create snapshot at offset (" + snapshotId.offset() + ") because it is not batch aligned. " + + "The batch containing the requested offset has a base offset of (" + baseOffset + ")" + ); + } + return createNewSnapshotUnchecked(snapshotId); + } + + @Override + public Optional<RawSnapshotWriter> createNewSnapshotUnchecked(OffsetAndEpoch snapshotId) { + boolean containsSnapshotId; + synchronized (snapshots) { + containsSnapshotId = snapshots.containsKey(snapshotId); + } + + if (containsSnapshotId) { + return Optional.empty(); + } else { + return Optional.of( + new NotifyingRawSnapshotWriter( + FileRawSnapshotWriter.create(log.dir().toPath(), snapshotId), + this::onSnapshotFrozen + ) + ); + } + } + + @Override + public Optional<RawSnapshotReader> readSnapshot(OffsetAndEpoch snapshotId) { + synchronized (snapshots) { + Optional<FileRawSnapshotReader> reader = snapshots.get(snapshotId); + if (reader == null) { + return Optional.empty(); + } else if (reader.isPresent()) { + return Optional.of(reader.get()); + } else { + // Snapshot exists but has never been read before + try { + FileRawSnapshotReader fileReader = FileRawSnapshotReader.open(log.dir().toPath(), snapshotId); + snapshots.put(snapshotId, Optional.of(fileReader)); + return Optional.of(fileReader); + } catch (UncheckedIOException e) { + // Snapshot doesn't exist in the data dir; remove + Path path = Snapshots.snapshotPath(log.dir().toPath(), snapshotId); + logger.warn("Couldn't read {}; expected to find snapshot file {}", snapshotId, path); + snapshots.remove(snapshotId); + return Optional.empty(); + } + } + } + } + + @Override + public Optional<RawSnapshotReader> latestSnapshot() { + synchronized (snapshots) { + return latestSnapshotId().flatMap(this::readSnapshot); + } + } + + @Override + public Optional<OffsetAndEpoch> latestSnapshotId() { + synchronized (snapshots) { + try { + OffsetAndEpoch epoch = snapshots.lastKey(); + return epoch == null ? Optional.empty() : Optional.of(epoch); + } catch (NoSuchElementException e) { + return Optional.empty(); + } + } + } + + @Override + public Optional<OffsetAndEpoch> earliestSnapshotId() { + synchronized (snapshots) { + try { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org