[ 
https://issues.apache.org/jira/browse/KAFKA-20595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18081458#comment-18081458
 ] 

Chia-Ping Tsai commented on KAFKA-20595:
----------------------------------------


{code:java}
    private void loadSegmentFiles() throws IOException {
        // load segments in ascending order because transactional data from one 
segment may depend on the
        // segments that come before it
        File[] files = dir.listFiles();
        if (files == null) files = new File[0];
        List<File> sortedFiles = 
Arrays.stream(files).filter(File::isFile).sorted().toList();
        for (File file : sortedFiles) {
            if (LogFileUtils.isIndexFile(file)) {
                // if it is an index file, make sure it has a corresponding 
.log file
                long offset = LogFileUtils.offsetFromFile(file);
                File logFile = LogFileUtils.logFile(dir, offset);
                if (!logFile.exists()) {
                    logger.warn("Found an orphaned index file {}, with no 
corresponding log file.", file.getAbsolutePath());
                    Files.deleteIfExists(file.toPath());
                }
            } else if (LogFileUtils.isLogFile(file)) {
                // if it's a log file, load the corresponding log segment
                long baseOffset = LogFileUtils.offsetFromFile(file);
                boolean timeIndexFileNewlyCreated = 
!LogFileUtils.timeIndexFile(dir, baseOffset).exists();
                LogSegment segment = LogSegment.open(dir, baseOffset, config, 
time, true, 0, false, "");
                try {
                    segment.sanityCheck(timeIndexFileNewlyCreated);
                } catch (NoSuchFileException nsfe) {
                    if (hadCleanShutdown || segment.baseOffset() < 
recoveryPointCheckpoint) {
                        logger.error("Could not find offset index file 
corresponding to log file {}, recovering segment and rebuilding index 
files...", segment.log().file().getAbsolutePath());
                    }
                    recoverSegment(segment);
                } catch (CorruptIndexException cie) {
                    logger.warn("Found a corrupted index file corresponding to 
log file {} due to {}, recovering segment and rebuilding index files...", 
segment.log().file().getAbsolutePath(), cie.getMessage());
                    recoverSegment(segment);
                }
                segments.add(segment);
            }
        }
    }
{code}


> Parallelize loadSegmentFiles
> ----------------------------
>
>                 Key: KAFKA-20595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20595
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>              Labels: need-kip
>
> This is related to KAFKA-20567. 
> In the happy path where all segment files are intact and index rebuilding is 
> not required, we can parallelize the log segment loading process. If we do 
> encounter any corrupted or orphaned files during the parallel scan, we can 
> collect them into a list and handle the recovery sequentially afterward to 
> ensure correctness



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to