mimaison commented on code in PR #17587:
URL: https://github.com/apache/kafka/pull/17587#discussion_r1818030649


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##########
@@ -29,25 +35,768 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.require;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.DELETE_DIR_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.FUTURE_DIR_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.STRAY_DIR_SUFFIX;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.SWAP_FILE_SUFFIX;
 import static org.apache.kafka.storage.internals.log.LogFileUtils.isLogFile;
 
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ */
 public class LocalLog {
 
     private static final Logger LOG = LoggerFactory.getLogger(LocalLog.class);
 
+    public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.DELETE_DIR_SUFFIX);
+    public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.FUTURE_DIR_SUFFIX);
+    public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.STRAY_DIR_SUFFIX);
+    public static final long UNKNOWN_OFFSET = -1L;
+
+    // Last time the log was flushed
+    private final AtomicLong lastFlushedTime;
+    private final String logIdent;
+    private final LogSegments segments;
+    private final Scheduler scheduler;
+    private final Time time;
+    private final TopicPartition topicPartition;
+    private final LogDirFailureChannel logDirFailureChannel;
+    private final Logger logger;
+
+    private volatile LogOffsetMetadata nextOffsetMetadata;
+    // The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+    // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+    private volatile boolean isMemoryMappedBufferClosed = false;
+    // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+    private volatile String parentDir;
+    private volatile LogConfig config;
+    private volatile long recoveryPoint;
+    private File dir;
+
+    /**
+     * @param dir The directory in which log segments are created.
+     * @param config The log configuration settings
+     * @param segments The non-empty log segments recovered from disk
+     * @param recoveryPoint The offset at which to begin the next recovery 
i.e. the first offset which has not been flushed to disk
+     * @param nextOffsetMetadata The offset where the next message could be 
appended
+     * @param scheduler The thread pool scheduler used for background actions
+     * @param time The time instance used for checking the clock
+     * @param topicPartition The topic partition associated with this log
+     * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+     */
+    public LocalLog(File dir,
+                    LogConfig config,
+                    LogSegments segments,
+                    long recoveryPoint,
+                    LogOffsetMetadata nextOffsetMetadata,
+                    Scheduler scheduler,
+                    Time time,
+                    TopicPartition topicPartition,
+                    LogDirFailureChannel logDirFailureChannel) {
+        this.dir = dir;
+        this.config = config;
+        this.segments = segments;
+        this.recoveryPoint = recoveryPoint;
+        this.nextOffsetMetadata = nextOffsetMetadata;
+        this.scheduler = scheduler;
+        this.time = time;
+        this.topicPartition = topicPartition;
+        this.logDirFailureChannel = logDirFailureChannel;
+        this.logIdent = "[LocalLog partition=" + topicPartition + ", dir=" + 
dir + "] ";
+        this.logger = new LogContext(logIdent).logger(LocalLog.class);
+        // Last time the log was flushed
+        this.lastFlushedTime = new AtomicLong(time.milliseconds());
+        this.parentDir = dir.getParent();
+    }
+
+    public File dir() {
+        return dir;
+    }
+
+    public Logger logger() {
+        return logger;
+    }
+
+    public LogConfig config() {
+        return config;
+    }
+
+    public LogSegments segments() {
+        return segments;
+    }
+
+    public Scheduler scheduler() {
+        return scheduler;
+    }
+
+    public LogOffsetMetadata nextOffsetMetadata() {
+        return nextOffsetMetadata;
+    }
+
+    public TopicPartition topicPartition() {
+        return topicPartition;
+    }
+
+    public LogDirFailureChannel logDirFailureChannel() {
+        return logDirFailureChannel;
+    }
+
+    public long recoveryPoint() {
+        return recoveryPoint;
+    }
+
+    public Time time() {
+        return time;
+    }
+
+    public String name() {
+        return dir.getName();
+    }
+
+    public String parentDir() {
+        return parentDir;
+    }
+
+    public File parentDirFile() {
+        return new File(parentDir);
+    }
+
+    public boolean isFuture() {
+        return dir.getName().endsWith(LogFileUtils.FUTURE_DIR_SUFFIX);
+    }
+
+    private <T> T maybeHandleIOException(Supplier<String> errorMsgSupplier, 
StorageAction<T, IOException> function) {
+        return maybeHandleIOException(logDirFailureChannel, parentDir, 
errorMsgSupplier, function);
+    }
+
+    /**
+     * Rename the directory of the log
+     * @param name the new dir name
+     * @throws KafkaStorageException if rename fails
+     */
+    public boolean renameDir(String name) {
+        return maybeHandleIOException(
+            () -> "Error while renaming dir for " + topicPartition + " in log 
dir " +  dir.getParent(),
+            () -> {
+                File renamedDir = new File(dir.getParent(), name);
+                Utils.atomicMoveWithFallback(dir.toPath(), 
renamedDir.toPath());
+                if (!renamedDir.equals(dir)) {
+                    dir = renamedDir;
+                    parentDir = renamedDir.getParent();
+                    segments.updateParentDir(renamedDir);
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        );
+    }
+
+    /**
+     * Update the existing configuration to the new provided configuration.
+     * @param newConfig the new configuration to be updated to
+     */
+    public void updateConfig(LogConfig newConfig) {
+        LogConfig oldConfig = config;
+        config = newConfig;
+        RecordVersion oldRecordVersion = oldConfig.recordVersion();
+        RecordVersion newRecordVersion = newConfig.recordVersion();
+        if (newRecordVersion.precedes(oldRecordVersion)) {
+            logger.warn("Record format version has been downgraded from " + 
oldRecordVersion + " to " + newRecordVersion + ".");
+        }
+    }
+
+    public void checkIfMemoryMappedBufferClosed() {
+        if (isMemoryMappedBufferClosed) {
+            throw new KafkaStorageException("The memory mapped buffer for log 
of " + topicPartition + " is already closed");
+        }
+    }
+
+    public void updateRecoveryPoint(long newRecoveryPoint) {
+        recoveryPoint = newRecoveryPoint;
+    }
+
+    /**
+     * Update recoveryPoint to provided offset and mark the log as flushed, if 
the offset is greater
+     * than the existing recoveryPoint.
+     *
+     * @param offset the offset to be updated
+     */
+    public void markFlushed(long offset) {
+        checkIfMemoryMappedBufferClosed();
+        if (offset > recoveryPoint) {
+            updateRecoveryPoint(offset);
+            lastFlushedTime.set(time.milliseconds());
+        }
+    }
+
+    /**
+     * The number of messages appended to the log since the last flush
+     */
+    public long unflushedMessages() {
+        return logEndOffset() - recoveryPoint;
+    }
+
+    /**
+     * Flush local log segments for all offsets up to offset-1.
+     * Does not update the recovery point.
+     *
+     * @param offset The offset to flush up to (non-inclusive)
+     */
+    public void flush(long offset) throws IOException {
+        long currentRecoveryPoint = recoveryPoint;
+        if (currentRecoveryPoint <= offset) {
+            Collection<LogSegment> segmentsToFlush = 
segments.values(currentRecoveryPoint, offset);
+            for (LogSegment logSegment : segmentsToFlush) {
+                logSegment.flush();
+            }
+            // If there are any new segments, we need to flush the parent 
directory for crash consistency.
+            if (segmentsToFlush.stream().anyMatch(s -> s.baseOffset() >= 
currentRecoveryPoint)) {
+                // The directory might be renamed concurrently for topic 
deletion, which may cause NoSuchFileException here.
+                // Since the directory is to be deleted anyways, we just 
swallow NoSuchFileException and let it go.
+                Utils.flushDirIfExists(dir.toPath());
+            }
+        }
+    }
+
+    /**
+     * The time this log is last known to have been fully flushed to disk
+     */
+    public long lastFlushTime() {
+        return lastFlushedTime.get();
+    }
+
+    /**
+     * The offset metadata of the next message that will be appended to the log
+     */
+    public LogOffsetMetadata logEndOffsetMetadata() {
+        return nextOffsetMetadata;
+    }
+
+    /**
+     * The offset of the next message that will be appended to the log
+     */
+    public long logEndOffset() {
+        return nextOffsetMetadata.messageOffset;
+    }
+
+    /**
+     * Update end offset of the log, and update the recoveryPoint.
+     *
+     * @param endOffset the new end offset of the log
+     */
+    public void updateLogEndOffset(long endOffset) {
+        nextOffsetMetadata = new LogOffsetMetadata(endOffset, 
segments.activeSegment().baseOffset(), segments.activeSegment().size());
+        if (recoveryPoint > endOffset) {
+            updateRecoveryPoint(endOffset);
+        }
+    }
+
+    /**
+     * Close file handlers used by log but don't write to disk.
+     * This is called if the log directory is offline.
+     */
+    public void closeHandlers() {
+        segments.closeHandlers();
+        isMemoryMappedBufferClosed = true;
+    }
+
+    /**
+     * Closes the segments of the log.
+     */
+    public void close() {
+        maybeHandleIOException(
+            () -> "Error while renaming dir for " + topicPartition + " in dir 
" + dir.getParent(),
+            () -> {
+                checkIfMemoryMappedBufferClosed();
+                segments.close();
+                return null;
+            }
+        );
+    }
+
+    /**
+     * Completely delete this log directory with no delay.
+     */
+    public void deleteEmptyDir() {
+        maybeHandleIOException(
+            () -> "Error while deleting dir for " + topicPartition + " in dir 
" + dir.getParent(),
+            () -> {
+                if (!segments.isEmpty()) {
+                    throw new IllegalStateException("Can not delete directory 
when " + segments.numberOfSegments() + " segments are still present");
+                }
+                if (!isMemoryMappedBufferClosed) {
+                    throw new IllegalStateException("Can not delete directory 
when memory mapped buffer for log of " + topicPartition + " is still open.");
+                }
+                Utils.delete(dir);
+                return null;
+            }
+        );
+    }
+
+    /**
+     * Completely delete all segments with no delay.
+     * @return the deleted segments
+     */
+    public List<LogSegment> deleteAllSegments() {
+        return maybeHandleIOException(
+            () -> "Error while deleting all segments for $topicPartition in 
dir ${dir.getParent}",
+            () -> {
+                List<LogSegment> deletableSegments = new 
ArrayList<>(segments.values());
+                removeAndDeleteSegments(segments.values(), false, new 
LogDeletion(logger));
+                isMemoryMappedBufferClosed = true;
+                return deletableSegments;
+            }
+        );
+    }
+
+    /**
+     * This method deletes the given log segments by doing the following for 
each of them:
+     * <ul>
+     *  <li>It removes the segment from the segment map so that it will no 
longer be used for reads.
+     *  <li>It renames the index and log files by appending .deleted to the 
respective file name
+     *  <li>It can either schedule an asynchronous delete operation to occur 
in the future or perform the deletion synchronously
+     * </ul>
+     * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
+     * physically deleting a file while it is being read.
+     * This method does not convert IOException to KafkaStorageException, the 
immediate caller
+     * is expected to catch and handle IOException.
+     *
+     * @param segmentsToDelete The log segments to schedule for deletion
+     * @param asyncDelete Whether the segment files should be deleted 
asynchronously
+     * @param reason The reason for the segment deletion
+     */
+    public void removeAndDeleteSegments(Collection<LogSegment> 
segmentsToDelete,
+                                         boolean asyncDelete,
+                                         SegmentDeletionReason reason) throws 
IOException {
+        if (!segmentsToDelete.isEmpty()) {
+            // Most callers hold an iterator into the `segments` collection 
and `removeAndDeleteSegment` mutates it by
+            // removing the deleted segment, we should force materialization 
of the iterator here, so that results of the
+            // iteration remain valid and deterministic. We should also pass 
only the materialized view of the
+            // iterator to the logic that actually deletes the segments.
+            List<LogSegment> toDelete = new ArrayList<>(segmentsToDelete);
+            reason.logReason(toDelete);
+            toDelete.forEach(segment -> segments.remove(segment.baseOffset()));
+            deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, 
config, scheduler, logDirFailureChannel, logIdent);
+        }
+    }
+
+    /**
+     * This method deletes the given segment and creates a new segment with 
the given new base offset. It ensures an
+     * active segment exists in the log at all times during this process.
+     * <br/>
+     * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
+     * physically deleting a file while it is being read.
+     * <br/>
+     * This method does not convert IOException to KafkaStorageException, the 
immediate caller
+     * is expected to catch and handle IOException.
+     *
+     * @param newOffset The base offset of the new segment
+     * @param segmentToDelete The old active segment to schedule for deletion
+     * @param asyncDelete Whether the segment files should be deleted 
asynchronously
+     * @param reason The reason for the segment deletion
+     */
+    public LogSegment createAndDeleteSegment(long newOffset,
+                                              LogSegment segmentToDelete,
+                                              boolean asyncDelete,
+                                              SegmentDeletionReason reason) 
throws IOException {
+        if (newOffset == segmentToDelete.baseOffset()) {
+            segmentToDelete.changeFileSuffixes("", 
LogFileUtils.DELETED_FILE_SUFFIX);
+        }
+        LogSegment newSegment = LogSegment.open(dir,
+                newOffset,
+                config,
+                time,
+                config.initFileSize(),
+                config.preallocate);
+        segments.add(newSegment);
+
+        reason.logReason(singletonList(segmentToDelete));
+        if (newOffset != segmentToDelete.baseOffset()) {
+            segments.remove(segmentToDelete.baseOffset());
+        }
+        deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logIdent);
+        return newSegment;
+    }
+
+    /**
+     * Given a message offset, find its corresponding offset metadata in the 
log.
+     * If the message offset is out of range, throw an 
OffsetOutOfRangeException
+     */
+    public LogOffsetMetadata convertToOffsetMetadataOrThrow(long offset) 
throws IOException {
+        FetchDataInfo fetchDataInfo = read(offset, 1, false, 
nextOffsetMetadata, false);
+        return fetchDataInfo.fetchOffsetMetadata;
+    }
+
+    /**
+     * Read messages from the log.
+     *
+     * @param startOffset The offset to begin reading at
+     * @param maxLength The maximum number of bytes to read
+     * @param minOneMessage If this is true, the first message will be 
returned even if it exceeds `maxLength` (if one exists)
+     * @param maxOffsetMetadata The metadata of the maximum offset to be 
fetched
+     * @param includeAbortedTxns If true, aborted transactions are included
+     * @throws OffsetOutOfRangeException If startOffset is beyond the log end 
offset
+     * @return The fetch data information including fetch starting offset 
metadata and messages read.
+     */
+    public FetchDataInfo read(long startOffset,
+                       int maxLength,
+                       boolean minOneMessage,
+                       LogOffsetMetadata maxOffsetMetadata,
+                       boolean includeAbortedTxns) throws IOException {
+        return maybeHandleIOException(
+                () -> "Exception while reading from " + topicPartition + " in 
dir " + dir.getParent(),
+                () -> {
+                    logger.trace("Reading maximum $maxLength bytes at offset " 
+ startOffset + " from log with " +
+                            "total length " + segments.sizeInBytes() + " 
bytes");
+
+                    LogOffsetMetadata endOffsetMetadata = nextOffsetMetadata;
+                    long endOffset = endOffsetMetadata.messageOffset;
+                    Optional<LogSegment> segmentOpt = 
segments.floorSegment(startOffset);
+                    // return error on attempt to read beyond the log end 
offset
+                    if (startOffset > endOffset || segmentOpt.isEmpty()) {
+                        throw new OffsetOutOfRangeException("Received request 
for offset " + startOffset + " for partition " + topicPartition + ", " +
+                                "but we only have log segments upto " + 
endOffset + ".");
+                    }
+                    if (startOffset == maxOffsetMetadata.messageOffset) {
+                        return emptyFetchDataInfo(maxOffsetMetadata, 
includeAbortedTxns);
+                    } else if (startOffset > maxOffsetMetadata.messageOffset) {
+                        return 
emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns);
+                    } else {
+                        // Do the read on the segment with a base offset less 
than the target offset
+                        // but if that segment doesn't contain any messages 
with an offset greater than that
+                        // continue to read from successive segments until we 
get some messages or we reach the end of the log
+                        FetchDataInfo fetchDataInfo = null;
+                        while (fetchDataInfo == null && 
segmentOpt.isPresent()) {
+                            LogSegment segment = segmentOpt.get();
+                            long baseOffset = segment.baseOffset();
+
+                            // 1. If `maxOffsetMetadata#segmentBaseOffset < 
segment#baseOffset`, then return maxPosition as empty.
+                            // 2. Use the max-offset position if it is on this 
segment; otherwise, the segment size is the limit.
+                            // 3. When maxOffsetMetadata is 
message-offset-only, then we don't know the relativePositionInSegment so
+                            //    return maxPosition as empty to avoid reading 
beyond the max-offset
+                            Optional<Long> maxPositionOpt;
+                            if (segment.baseOffset() < 
maxOffsetMetadata.segmentBaseOffset)
+                                maxPositionOpt = Optional.of((long) 
segment.size());
+                            else if (segment.baseOffset() == 
maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())
+                                maxPositionOpt = Optional.of((long) 
maxOffsetMetadata.relativePositionInSegment);
+                            else
+                                maxPositionOpt = Optional.empty();
+
+                            fetchDataInfo = segment.read(startOffset, 
maxLength, maxPositionOpt, minOneMessage);
+                            if (fetchDataInfo != null) {
+                                if (includeAbortedTxns) {
+                                    fetchDataInfo = 
addAbortedTransactions(startOffset, segment, fetchDataInfo);
+                                }
+                            } else {
+                                segmentOpt = 
segments.higherSegment(baseOffset);
+                            }
+                        }
+
+                        if (fetchDataInfo != null) {
+                            return fetchDataInfo;
+                        } else {
+                            // okay we are beyond the end of the last segment 
with no data fetched although the start offset is in range,
+                            // this can happen when all messages with offset 
larger than start offsets have been deleted.
+                            // In this case, we will return the empty set with 
log end offset metadata
+                            return new FetchDataInfo(nextOffsetMetadata, 
MemoryRecords.EMPTY);
+                        }
+                    }
+                }
+        );
+    }
+
+    public void append(long lastOffset, long largestTimestamp, long 
shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException {
+        segments.activeSegment().append(lastOffset, largestTimestamp, 
shallowOffsetOfMaxTimestamp, records);
+        updateLogEndOffset(lastOffset + 1);
+    }
+
+    FetchDataInfo addAbortedTransactions(long startOffset, LogSegment segment, 
FetchDataInfo fetchInfo) throws IOException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new OffsetPosition(
+                fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+        long upperBoundOffset = 
segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .orElse(segments.higherSegment(segment.baseOffset())
+                        .map(LogSegment::baseOffset).orElse(logEndOffset()));
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = new 
ArrayList<>();
+        Consumer<List<AbortedTxn>> accumulator = abortedTxns -> {
+            for (AbortedTxn abortedTxn : abortedTxns)
+                abortedTransactions.add(abortedTxn.asAbortedTransaction());
+        };
+        collectAbortedTransactions(startOffset, upperBoundOffset, segment, 
accumulator);
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions));
+    }
+
+    private void collectAbortedTransactions(long startOffset, long 
upperBoundOffset,
+                                            LogSegment startingSegment,
+                                            Consumer<List<AbortedTxn>> 
accumulator) {
+        Iterator<LogSegment> higherSegments = 
segments.higherSegments(startingSegment.baseOffset()).iterator();
+        Optional<LogSegment> segmentEntryOpt = Optional.of(startingSegment);
+        while (segmentEntryOpt.isPresent()) {
+            LogSegment segment = segmentEntryOpt.get();
+            TxnIndexSearchResult searchResult = 
segment.collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (searchResult.isComplete) return;
+            segmentEntryOpt = nextItem(higherSegments);
+        }
+    }
+
+    public List<AbortedTxn> collectAbortedTransactions(long logStartOffset, 
long baseOffset, long upperBoundOffset) {
+        Optional<LogSegment> segmentEntry = segments.floorSegment(baseOffset);
+        List<AbortedTxn> allAbortedTxns = new ArrayList<>();
+        segmentEntry.ifPresent(logSegment -> 
collectAbortedTransactions(logStartOffset, upperBoundOffset, logSegment, 
allAbortedTxns::addAll));
+        return allAbortedTxns;
+    }
+
+    /**
+     * Roll the log over to a new active segment starting with the current 
logEndOffset.
+     * This will trim the index to the exact size of the number of entries it 
currently contains.
+     *
+     * @param expectedNextOffset The expected next offset after the segment is 
rolled
+     *
+     * @return The newly rolled segment
+     */
+    public LogSegment roll(Optional<Long> expectedNextOffset) {
+        return maybeHandleIOException(
+            () -> "Error while rolling log segment for " + topicPartition + " 
in dir " + dir.getParent(),
+            () -> {
+                long start = time.hiResClockMs();
+                checkIfMemoryMappedBufferClosed();
+                long newOffset = Math.max(expectedNextOffset.orElse(0L), 
logEndOffset());
+                File logFile = LogFileUtils.logFile(dir, newOffset, "");
+                LogSegment activeSegment = segments.activeSegment();
+                if (segments.contains(newOffset)) {
+                    // segment with the same base offset already exists and 
loaded
+                    if (activeSegment.baseOffset() == newOffset && 
activeSegment.size() == 0) {
+                        // We have seen this happen (see KAFKA-6388) after 
shouldRoll() returns true for an
+                        // active segment of size zero because of one of the 
indexes is "full" (due to _maxEntries == 0).
+                        logger.warn("Trying to roll a new log segment with 
start offset " + newOffset +
+                                "=max(provided offset = " + expectedNextOffset 
+ ", LEO = " + logEndOffset() + ") while it already " +
+                                "exists and is active with size 0. Size of 
time index: " + activeSegment.timeIndex().entries() +
+                                ", size of offset index: " + 
activeSegment.offsetIndex().entries() + ".");
+                        LogSegment newSegment = 
createAndDeleteSegment(newOffset, activeSegment, true, new LogRoll(logger));
+                        updateLogEndOffset(nextOffsetMetadata.messageOffset);
+                        logger.info("Rolled new log segment at offset " + 
newOffset + " in " + (time.hiResClockMs() - start) + " ms.");
+                        return newSegment;
+                    } else {
+                        throw new KafkaException("Trying to roll a new log 
segment for topic partition " + topicPartition + " with start offset " + 
newOffset +
+                                " =max(provided offset = " + 
expectedNextOffset + ", LEO = " + logEndOffset() + ") while it already exists. 
Existing " +
+                                "segment is " + segments.get(newOffset) + ".");
+                    }
+                } else if (!segments.isEmpty() && newOffset < 
activeSegment.baseOffset()) {
+                    throw new KafkaException(
+                            "Trying to roll a new log segment for topic 
partition " + topicPartition + " with " +
+                            "start offset " + newOffset + " =max(provided 
offset = " + expectedNextOffset + ", LEO = " + logEndOffset() + ") lower than 
start offset of the active segment " + activeSegment);
+                } else {
+                    File offsetIdxFile = LogFileUtils.offsetIndexFile(dir, 
newOffset);
+                    File timeIdxFile = LogFileUtils.timeIndexFile(dir, 
newOffset);
+                    File txnIdxFile = LogFileUtils.transactionIndexFile(dir, 
newOffset);
+                    for (File file : Arrays.asList(logFile, offsetIdxFile, 
timeIdxFile, txnIdxFile)) {
+                        if (file.exists()) {
+                            logger.warn("Newly rolled segment file " + 
file.getAbsolutePath() + " already exists; deleting it first");
+                            Files.delete(file.toPath());
+                        }
+                    }
+                    if (segments.lastSegment().isPresent()) {
+                        segments.lastSegment().get().onBecomeInactiveSegment();
+                    }
+                }
+                LogSegment newSegment = LogSegment.open(dir,
+                        newOffset,
+                        config,
+                        time,
+                        config.initFileSize(),
+                        config.preallocate);
+                segments.add(newSegment);
+
+                // We need to update the segment base offset and append 
position data of the metadata when log rolls.
+                // The next offset should not change.
+                updateLogEndOffset(nextOffsetMetadata.messageOffset);
+                logger.info("Rolled new log segment at offset " + newOffset + 
" in " + (time.hiResClockMs() - start) + " ms.");
+                return newSegment;
+            }
+        );
+    }
+
+    /**
+     *  Delete all data in the local log and start at the new offset.
+     *
+     *  @param newOffset The new offset to start the log with
+     *  @return the list of segments that were scheduled for deletion
+     */
+    public List<LogSegment> truncateFullyAndStartAt(long newOffset) {
+        return maybeHandleIOException(
+            () -> "Error while truncating the entire log for " + 
topicPartition + " in dir " + dir.getParent(),
+            () -> {
+                logger.debug("Truncate and start at offset " + newOffset);
+                checkIfMemoryMappedBufferClosed();
+                List<LogSegment> segmentsToDelete = new 
ArrayList<>(segments.values());
+
+                if (!segmentsToDelete.isEmpty()) {
+                    removeAndDeleteSegments(segmentsToDelete.subList(0, 
segmentsToDelete.size() - 1), true, new LogTruncation(logger));
+                    // Use createAndDeleteSegment() to create new segment 
first and then delete the old last segment to prevent missing
+                    // active segment during the deletion process
+                    createAndDeleteSegment(newOffset, 
segmentsToDelete.get(segmentsToDelete.size() - 1), true, new 
LogTruncation(logger));
+                }
+                updateLogEndOffset(newOffset);
+                return segmentsToDelete;
+            }
+        );
+    }
+
+    /**
+     * Truncate this log so that it ends with the greatest offset < 
targetOffset.
+     *
+     * @param targetOffset The offset to truncate to, an upper bound on all 
offsets in the log after truncation is complete.
+     * @return the list of segments that were scheduled for deletion
+     */
+    public Collection<LogSegment> truncateTo(long targetOffset) throws 
IOException {
+        Collection<LogSegment> deletableSegments = segments.filter(segment -> 
segment.baseOffset() > targetOffset);
+        removeAndDeleteSegments(deletableSegments, true, new 
LogTruncation(logger));
+        segments.activeSegment().truncateTo(targetOffset);
+        updateLogEndOffset(targetOffset);
+        return deletableSegments;
+    }
+
+    /**
+     * Return a directory name to rename the log directory to for async 
deletion.
+     * The name will be in the following format: 
"topic-partitionId.uniqueId-delete".
+     * If the topic name is too long, it will be truncated to prevent the 
total name
+     * from exceeding 255 characters.
+     */
+    public static String logDeleteDirName(TopicPartition topicPartition) {
+        return logDirNameWithSuffixCappedLength(topicPartition, 
LogFileUtils.DELETE_DIR_SUFFIX);
+    }
+
+    /**
+     * Return a directory name to rename the log directory to for stray 
partition deletion.
+     * The name will be in the following format: 
"topic-partitionId.uniqueId-stray".
+     * If the topic name is too long, it will be truncated to prevent the 
total name
+     * from exceeding 255 characters.
+     */
+    public static String logStrayDirName(TopicPartition topicPartition) {
+        return logDirNameWithSuffixCappedLength(topicPartition, 
LogFileUtils.STRAY_DIR_SUFFIX);
+    }
+
+    /**
+     * Return a future directory name for the given topic partition. The name 
will be in the following
+     * format: topic-partition.uniqueId-future where topic, partition and 
uniqueId are variables.
+     */
+    public static String logFutureDirName(TopicPartition topicPartition) {
+        return logDirNameWithSuffix(topicPartition, 
LogFileUtils.FUTURE_DIR_SUFFIX);
+    }
+
+    /**
+     * Return a new directory name in the following format: 
"${topic}-${partitionId}.${uniqueId}${suffix}".
+     * If the topic name is too long, it will be truncated to prevent the 
total name
+     * from exceeding 255 characters.
+     */
+    private static String logDirNameWithSuffixCappedLength(TopicPartition 
topicPartition, String suffix) {
+        String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
+        String fullSuffix = "-" + topicPartition.partition() + "." + uniqueId 
+ suffix;
+        int prefixLength = Math.min(topicPartition.topic().length(), 255 - 
fullSuffix.length());
+        return topicPartition.topic().substring(0, prefixLength) + fullSuffix;
+    }
+
+    private static String logDirNameWithSuffix(TopicPartition topicPartition, 
String suffix) {
+        String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
+        return logDirName(topicPartition) + "." + uniqueId + suffix;
+    }
+
+    /**
+     * Return a directory name for the given topic partition. The name will be 
in the following
+     * format: topic-partition where topic, partition are variables.
+     */
+    public static String logDirName(TopicPartition topicPartition) {
+        return topicPartition.topic() + "-" + topicPartition.partition();
+    }
+
+    private static KafkaException exception(File dir) throws IOException {
+        return new KafkaException("Found directory " + dir.getCanonicalPath() 
+ ", '" + dir.getName() + "' is not in the form of " +
+                "topic-partition or topic-partition.uniqueId-delete (if marked 
for deletion).\n" +
+                "Kafka's log directories (and children) should only contain 
Kafka topic data.");
+    }
+
+    /**
+     * Parse the topic and partition out of the directory name of a log
+     */
+    public static TopicPartition parseTopicPartitionName(File dir) throws 
IOException {
+        if (dir == null) {
+            throw new KafkaException("dir should not be null");
+        }
+        String dirName = dir.getName();
+        if (dirName.isEmpty() || !dirName.contains("-")) {

Review Comment:
   I assumed it's an optimization to potentially avoid running `contains()`, so 
decided to keep it just in case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to