[
https://issues.apache.org/jira/browse/KAFKA-20595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18081458#comment-18081458
]
Chia-Ping Tsai commented on KAFKA-20595:
----------------------------------------
{code:java}
private void loadSegmentFiles() throws IOException {
// load segments in ascending order because transactional data from one
segment may depend on the
// segments that come before it
File[] files = dir.listFiles();
if (files == null) files = new File[0];
List<File> sortedFiles =
Arrays.stream(files).filter(File::isFile).sorted().toList();
for (File file : sortedFiles) {
if (LogFileUtils.isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding
.log file
long offset = LogFileUtils.offsetFromFile(file);
File logFile = LogFileUtils.logFile(dir, offset);
if (!logFile.exists()) {
logger.warn("Found an orphaned index file {}, with no
corresponding log file.", file.getAbsolutePath());
Files.deleteIfExists(file.toPath());
}
} else if (LogFileUtils.isLogFile(file)) {
// if it's a log file, load the corresponding log segment
long baseOffset = LogFileUtils.offsetFromFile(file);
boolean timeIndexFileNewlyCreated =
!LogFileUtils.timeIndexFile(dir, baseOffset).exists();
LogSegment segment = LogSegment.open(dir, baseOffset, config,
time, true, 0, false, "");
try {
segment.sanityCheck(timeIndexFileNewlyCreated);
} catch (NoSuchFileException nsfe) {
if (hadCleanShutdown || segment.baseOffset() <
recoveryPointCheckpoint) {
logger.error("Could not find offset index file
corresponding to log file {}, recovering segment and rebuilding index
files...", segment.log().file().getAbsolutePath());
}
recoverSegment(segment);
} catch (CorruptIndexException cie) {
logger.warn("Found a corrupted index file corresponding to
log file {} due to {}, recovering segment and rebuilding index files...",
segment.log().file().getAbsolutePath(), cie.getMessage());
recoverSegment(segment);
}
segments.add(segment);
}
}
}
{code}
> Parallelize loadSegmentFiles
> ----------------------------
>
> Key: KAFKA-20595
> URL: https://issues.apache.org/jira/browse/KAFKA-20595
> Project: Kafka
> Issue Type: Improvement
> Reporter: Chia-Ping Tsai
> Assignee: Chia-Ping Tsai
> Priority: Major
> Labels: need-kip
>
> This is related to KAFKA-20567.
> In the happy path where all segment files are intact and index rebuilding is
> not required, we can parallelize the log segment loading process. If we do
> encounter any corrupted or orphaned files during the parallel scan, we can
> collect them into a list and handle the recovery sequentially afterward to
> ensure correctness
--
This message was sent by Atlassian Jira
(v8.20.10#820010)