mimaison commented on code in PR #19030: URL: https://github.com/apache/kafka/pull/19030#discussion_r1977276265
########## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ########## @@ -16,44 +16,2351 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeProducersResponseData; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; -public class UnifiedLog { +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * A log which presents a unified view of local and tiered log segments. + * + * <p>The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an + * overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments + * are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local + * segments including the active segment. + * + * <p>NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered + * and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance. + */ +public class UnifiedLog implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(UnifiedLog.class); - public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX; - public static final String INDEX_FILE_SUFFIX = LogFileUtils.INDEX_FILE_SUFFIX; - public static final String TIME_INDEX_FILE_SUFFIX = LogFileUtils.TIME_INDEX_FILE_SUFFIX; - public static final String TXN_INDEX_FILE_SUFFIX = LogFileUtils.TXN_INDEX_FILE_SUFFIX; - public static final String CLEANED_FILE_SUFFIX = LogFileUtils.CLEANED_FILE_SUFFIX; - public static final String SWAP_FILE_SUFFIX = LogFileUtils.SWAP_FILE_SUFFIX; - public static final String DELETE_DIR_SUFFIX = LogFileUtils.DELETE_DIR_SUFFIX; - public static final String STRAY_DIR_SUFFIX = LogFileUtils.STRAY_DIR_SUFFIX; - public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET; + public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX; + public static final String INDEX_FILE_SUFFIX = LogFileUtils.INDEX_FILE_SUFFIX; + public static final String TIME_INDEX_FILE_SUFFIX = LogFileUtils.TIME_INDEX_FILE_SUFFIX; + public static final String TXN_INDEX_FILE_SUFFIX = LogFileUtils.TXN_INDEX_FILE_SUFFIX; + public static final String CLEANED_FILE_SUFFIX = LogFileUtils.CLEANED_FILE_SUFFIX; + public static final String SWAP_FILE_SUFFIX = LogFileUtils.SWAP_FILE_SUFFIX; + public static final String DELETE_DIR_SUFFIX = LogFileUtils.DELETE_DIR_SUFFIX; + public static final String STRAY_DIR_SUFFIX = LogFileUtils.STRAY_DIR_SUFFIX; + public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET; + + // For compatibility, metrics are defined to be under `Log` class + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(UnifiedLog.class.getPackage().getName(), "Log"); Review Comment: Good catch, yes this has to be `kafka.log` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org