mimaison commented on code in PR #17587:
URL: https://github.com/apache/kafka/pull/17587#discussion_r1818052197


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##########
@@ -29,25 +35,768 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.require;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.DELETE_DIR_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.FUTURE_DIR_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.STRAY_DIR_SUFFIX;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.SWAP_FILE_SUFFIX;
 import static org.apache.kafka.storage.internals.log.LogFileUtils.isLogFile;
 
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ */
 public class LocalLog {
 
     private static final Logger LOG = LoggerFactory.getLogger(LocalLog.class);
 
+    public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.DELETE_DIR_SUFFIX);
+    public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.FUTURE_DIR_SUFFIX);
+    public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.STRAY_DIR_SUFFIX);
+    public static final long UNKNOWN_OFFSET = -1L;
+
+    // Last time the log was flushed
+    private final AtomicLong lastFlushedTime;
+    private final String logIdent;
+    private final LogSegments segments;
+    private final Scheduler scheduler;
+    private final Time time;
+    private final TopicPartition topicPartition;
+    private final LogDirFailureChannel logDirFailureChannel;
+    private final Logger logger;
+
+    private volatile LogOffsetMetadata nextOffsetMetadata;
+    // The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+    // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+    private volatile boolean isMemoryMappedBufferClosed = false;
+    // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+    private volatile String parentDir;
+    private volatile LogConfig config;
+    private volatile long recoveryPoint;
+    private File dir;
+
+    /**
+     * @param dir The directory in which log segments are created.
+     * @param config The log configuration settings
+     * @param segments The non-empty log segments recovered from disk
+     * @param recoveryPoint The offset at which to begin the next recovery 
i.e. the first offset which has not been flushed to disk
+     * @param nextOffsetMetadata The offset where the next message could be 
appended
+     * @param scheduler The thread pool scheduler used for background actions
+     * @param time The time instance used for checking the clock
+     * @param topicPartition The topic partition associated with this log
+     * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+     */
+    public LocalLog(File dir,
+                    LogConfig config,
+                    LogSegments segments,
+                    long recoveryPoint,
+                    LogOffsetMetadata nextOffsetMetadata,
+                    Scheduler scheduler,
+                    Time time,
+                    TopicPartition topicPartition,
+                    LogDirFailureChannel logDirFailureChannel) {
+        this.dir = dir;
+        this.config = config;
+        this.segments = segments;
+        this.recoveryPoint = recoveryPoint;
+        this.nextOffsetMetadata = nextOffsetMetadata;
+        this.scheduler = scheduler;
+        this.time = time;
+        this.topicPartition = topicPartition;
+        this.logDirFailureChannel = logDirFailureChannel;
+        this.logIdent = "[LocalLog partition=" + topicPartition + ", dir=" + 
dir + "] ";
+        this.logger = new LogContext(logIdent).logger(LocalLog.class);
+        // Last time the log was flushed
+        this.lastFlushedTime = new AtomicLong(time.milliseconds());
+        this.parentDir = dir.getParent();
+    }
+
+    public File dir() {
+        return dir;
+    }
+
+    public Logger logger() {
+        return logger;
+    }
+
+    public LogConfig config() {
+        return config;
+    }
+
+    public LogSegments segments() {
+        return segments;
+    }
+
+    public Scheduler scheduler() {
+        return scheduler;
+    }
+
+    public LogOffsetMetadata nextOffsetMetadata() {
+        return nextOffsetMetadata;
+    }
+
+    public TopicPartition topicPartition() {
+        return topicPartition;
+    }
+
+    public LogDirFailureChannel logDirFailureChannel() {
+        return logDirFailureChannel;
+    }
+
+    public long recoveryPoint() {
+        return recoveryPoint;
+    }
+
+    public Time time() {
+        return time;
+    }
+
+    public String name() {
+        return dir.getName();
+    }
+
+    public String parentDir() {
+        return parentDir;
+    }
+
+    public File parentDirFile() {
+        return new File(parentDir);
+    }
+
+    public boolean isFuture() {
+        return dir.getName().endsWith(LogFileUtils.FUTURE_DIR_SUFFIX);
+    }
+
+    private <T> T maybeHandleIOException(Supplier<String> errorMsgSupplier, 
StorageAction<T, IOException> function) {
+        return maybeHandleIOException(logDirFailureChannel, parentDir, 
errorMsgSupplier, function);
+    }
+
+    /**
+     * Rename the directory of the log
+     * @param name the new dir name
+     * @throws KafkaStorageException if rename fails
+     */
+    public boolean renameDir(String name) {
+        return maybeHandleIOException(
+            () -> "Error while renaming dir for " + topicPartition + " in log 
dir " +  dir.getParent(),
+            () -> {
+                File renamedDir = new File(dir.getParent(), name);
+                Utils.atomicMoveWithFallback(dir.toPath(), 
renamedDir.toPath());
+                if (!renamedDir.equals(dir)) {
+                    dir = renamedDir;
+                    parentDir = renamedDir.getParent();
+                    segments.updateParentDir(renamedDir);
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        );
+    }
+
+    /**
+     * Update the existing configuration to the new provided configuration.
+     * @param newConfig the new configuration to be updated to
+     */
+    public void updateConfig(LogConfig newConfig) {
+        LogConfig oldConfig = config;
+        config = newConfig;
+        RecordVersion oldRecordVersion = oldConfig.recordVersion();
+        RecordVersion newRecordVersion = newConfig.recordVersion();
+        if (newRecordVersion.precedes(oldRecordVersion)) {
+            logger.warn("Record format version has been downgraded from " + 
oldRecordVersion + " to " + newRecordVersion + ".");
+        }
+    }
+
+    public void checkIfMemoryMappedBufferClosed() {
+        if (isMemoryMappedBufferClosed) {
+            throw new KafkaStorageException("The memory mapped buffer for log 
of " + topicPartition + " is already closed");
+        }
+    }
+
+    public void updateRecoveryPoint(long newRecoveryPoint) {
+        recoveryPoint = newRecoveryPoint;
+    }
+
+    /**
+     * Update recoveryPoint to provided offset and mark the log as flushed, if 
the offset is greater
+     * than the existing recoveryPoint.
+     *
+     * @param offset the offset to be updated
+     */
+    public void markFlushed(long offset) {
+        checkIfMemoryMappedBufferClosed();
+        if (offset > recoveryPoint) {
+            updateRecoveryPoint(offset);
+            lastFlushedTime.set(time.milliseconds());
+        }
+    }
+
+    /**
+     * The number of messages appended to the log since the last flush
+     */
+    public long unflushedMessages() {
+        return logEndOffset() - recoveryPoint;
+    }
+
+    /**
+     * Flush local log segments for all offsets up to offset-1.
+     * Does not update the recovery point.
+     *
+     * @param offset The offset to flush up to (non-inclusive)
+     */
+    public void flush(long offset) throws IOException {
+        long currentRecoveryPoint = recoveryPoint;
+        if (currentRecoveryPoint <= offset) {
+            Collection<LogSegment> segmentsToFlush = 
segments.values(currentRecoveryPoint, offset);
+            for (LogSegment logSegment : segmentsToFlush) {
+                logSegment.flush();
+            }
+            // If there are any new segments, we need to flush the parent 
directory for crash consistency.
+            if (segmentsToFlush.stream().anyMatch(s -> s.baseOffset() >= 
currentRecoveryPoint)) {
+                // The directory might be renamed concurrently for topic 
deletion, which may cause NoSuchFileException here.
+                // Since the directory is to be deleted anyways, we just 
swallow NoSuchFileException and let it go.
+                Utils.flushDirIfExists(dir.toPath());
+            }
+        }
+    }
+
+    /**
+     * The time this log is last known to have been fully flushed to disk
+     */
+    public long lastFlushTime() {
+        return lastFlushedTime.get();
+    }
+
+    /**
+     * The offset metadata of the next message that will be appended to the log
+     */
+    public LogOffsetMetadata logEndOffsetMetadata() {
+        return nextOffsetMetadata;
+    }
+
+    /**
+     * The offset of the next message that will be appended to the log
+     */
+    public long logEndOffset() {
+        return nextOffsetMetadata.messageOffset;
+    }
+
+    /**
+     * Update end offset of the log, and update the recoveryPoint.
+     *
+     * @param endOffset the new end offset of the log
+     */
+    public void updateLogEndOffset(long endOffset) {
+        nextOffsetMetadata = new LogOffsetMetadata(endOffset, 
segments.activeSegment().baseOffset(), segments.activeSegment().size());
+        if (recoveryPoint > endOffset) {
+            updateRecoveryPoint(endOffset);
+        }
+    }
+
+    /**
+     * Close file handlers used by log but don't write to disk.
+     * This is called if the log directory is offline.
+     */
+    public void closeHandlers() {
+        segments.closeHandlers();
+        isMemoryMappedBufferClosed = true;
+    }
+
+    /**
+     * Closes the segments of the log.
+     */
+    public void close() {
+        maybeHandleIOException(
+            () -> "Error while renaming dir for " + topicPartition + " in dir 
" + dir.getParent(),
+            () -> {
+                checkIfMemoryMappedBufferClosed();
+                segments.close();
+                return null;
+            }
+        );
+    }
+
+    /**
+     * Completely delete this log directory with no delay.
+     */
+    public void deleteEmptyDir() {
+        maybeHandleIOException(
+            () -> "Error while deleting dir for " + topicPartition + " in dir 
" + dir.getParent(),
+            () -> {
+                if (!segments.isEmpty()) {
+                    throw new IllegalStateException("Can not delete directory 
when " + segments.numberOfSegments() + " segments are still present");
+                }
+                if (!isMemoryMappedBufferClosed) {
+                    throw new IllegalStateException("Can not delete directory 
when memory mapped buffer for log of " + topicPartition + " is still open.");
+                }
+                Utils.delete(dir);
+                return null;
+            }
+        );
+    }
+
+    /**
+     * Completely delete all segments with no delay.
+     * @return the deleted segments
+     */
+    public List<LogSegment> deleteAllSegments() {
+        return maybeHandleIOException(
+            () -> "Error while deleting all segments for $topicPartition in 
dir ${dir.getParent}",
+            () -> {
+                List<LogSegment> deletableSegments = new 
ArrayList<>(segments.values());
+                removeAndDeleteSegments(segments.values(), false, new 
LogDeletion(logger));

Review Comment:
   It's true that both `LogDeletion` and `LogRoll` are only used in a single 
place each so we could use lambdas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to