mimaison commented on code in PR #19030:
URL: https://github.com/apache/kafka/pull/19030#discussion_r1977276265


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -16,44 +16,2351 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeProducersResponseData;
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.PrimitiveRef;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.OffsetAndEpoch;
+import org.apache.kafka.server.common.RequestLocal;
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException;
 import org.apache.kafka.server.util.Scheduler;
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
-public class UnifiedLog {
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * A log which presents a unified view of local and tiered log segments.
+ *
+ * <p>The log consists of tiered and local segments with the tiered portion of 
the log being optional. There could be an
+ * overlap between the tiered and local segments. The active segment is always 
guaranteed to be local. If tiered segments
+ * are present, they always appear at the beginning of the log, followed by an 
optional region of overlap, followed by the local
+ * segments including the active segment.
+ *
+ * <p>NOTE: this class handles state and behavior specific to tiered segments 
as well as any behavior combining both tiered
+ * and local segments. The state and behavior specific to local segments are 
handled by the encapsulated LocalLog instance.
+ */
+public class UnifiedLog implements AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(UnifiedLog.class);
 
-    public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX;
-    public static final String INDEX_FILE_SUFFIX = 
LogFileUtils.INDEX_FILE_SUFFIX;
-    public static final String TIME_INDEX_FILE_SUFFIX = 
LogFileUtils.TIME_INDEX_FILE_SUFFIX;
-    public static final String TXN_INDEX_FILE_SUFFIX = 
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
-    public static final String CLEANED_FILE_SUFFIX = 
LogFileUtils.CLEANED_FILE_SUFFIX;
-    public static final String SWAP_FILE_SUFFIX = 
LogFileUtils.SWAP_FILE_SUFFIX;
-    public static final String DELETE_DIR_SUFFIX = 
LogFileUtils.DELETE_DIR_SUFFIX;
-    public static final String STRAY_DIR_SUFFIX = 
LogFileUtils.STRAY_DIR_SUFFIX;
-    public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET;
+    public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX;
+    public static final String INDEX_FILE_SUFFIX = 
LogFileUtils.INDEX_FILE_SUFFIX;
+    public static final String TIME_INDEX_FILE_SUFFIX = 
LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+    public static final String TXN_INDEX_FILE_SUFFIX = 
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+    public static final String CLEANED_FILE_SUFFIX = 
LogFileUtils.CLEANED_FILE_SUFFIX;
+    public static final String SWAP_FILE_SUFFIX = 
LogFileUtils.SWAP_FILE_SUFFIX;
+    public static final String DELETE_DIR_SUFFIX = 
LogFileUtils.DELETE_DIR_SUFFIX;
+    public static final String STRAY_DIR_SUFFIX = 
LogFileUtils.STRAY_DIR_SUFFIX;
+    public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET;
+
+    // For compatibility, metrics are defined to be under `Log` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(UnifiedLog.class.getPackage().getName(), "Log");

Review Comment:
   Good catch, yes this has to be `kafka.log`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to