junrao commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r659914342
########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -90,11 +90,63 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { - // first do a pass through the files in the log directory and remove any temporary files + + // First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) - // Now do a second pass and load all the log and index files. + // The remaining valid swap files must come from compaction or segment split operation. We can + // simply rename them to regular segment files. But, before renaming, we should figure out which + // segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. + // We store segments that require renaming in this code block, and do the actual renaming later. + var minSwapFileOffset = Long.MaxValue + var maxSwapFileOffset = Long.MinValue + val toRenameSwapFiles = mutable.Set[File]() + swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, + baseOffset = baseOffset, + params.config, + time = params.time, + fileSuffix = Log.SwapFileSuffix) + toRenameSwapFiles += f + info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") + minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) + maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) Review comment: > I could be wrong, but I think if it is compaction, the last record will never be removed. The reason is that compaction always removes earlier records of each key, and the last record will never be an earlier one. > > Split should be similar. It's true that we generally don't remove the last record during compaction. However, during a round of cleaning, we clean segments in groups and each group generates a single .clean file. The group is formed to make sure that offsets are still within 2 billion in offset gap and the .clean file won't exceed 2GB in size. If multiple groups are formed, it's possible that a group that's not the last doesn't preserve the last record. > How can we get the next segment before finishing the recovery process? We could potentially scan all .log files and sort them in offset order. ########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -90,11 +90,58 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { - // first do a pass through the files in the log directory and remove any temporary files + + // First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) - // Now do a second pass and load all the log and index files. + // The remaining valid swap files must come from compaction or segment split operation. We can + // simply rename them to regular segment files. But, before renaming, we should figure out which + // segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. Review comment: "which segments are compacted": .swap files are also generated from splitting. ########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -192,20 +230,10 @@ object LogLoader extends Logging { debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}") Files.deleteIfExists(file.toPath) } else if (filename.endsWith(CleanedFileSuffix)) { - minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) - cleanFiles += file + minCleanedFileOffset = Math.min(offsetFromFile(file), minCleanedFileOffset) + cleanedFiles += file } else if (filename.endsWith(SwapFileSuffix)) { - // we crashed in the middle of a swap operation, to recover: - // if a log, delete the index files, complete the swap operation later - // if an index just delete the index files, they will be rebuilt - val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) - info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.") - if (Log.isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) - } else if (Log.isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file - } + swapFiles += file Review comment: It's possible that during renaming, we have only renamed the .log file to .swap, but not the corresponding index files. Should we find those .clean files with the same offset and rename them to .swap? ########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -90,11 +90,58 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { - // first do a pass through the files in the log directory and remove any temporary files + + // First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) - // Now do a second pass and load all the log and index files. + // The remaining valid swap files must come from compaction or segment split operation. We can + // simply rename them to regular segment files. But, before renaming, we should figure out which + // segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. + // We store segments that require renaming in this code block, and do the actual renaming later. + var minSwapFileOffset = Long.MaxValue + var maxSwapFileOffset = Long.MinValue + swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, + baseOffset = baseOffset, + params.config, + time = params.time, + fileSuffix = Log.SwapFileSuffix) + info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") + minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) + maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) + } + + // Second pass: delete segments that are between minSwapFileOffset and maxSwapFileOffset. As + // discussed above, these segments were compacted or split but haven't been renamed to .delete + // before shutting down the broker. + for (file <- params.dir.listFiles if file.isFile) { + try { + if (!file.getName.endsWith(SwapFileSuffix)) { + val offset = offsetFromFile(file) + if (offset >= minSwapFileOffset && offset <= maxSwapFileOffset) { Review comment: If we use segment.nextOffset() to calculate maxSwapFileOffset, it's exclusive. ########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -90,11 +90,63 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { - // first do a pass through the files in the log directory and remove any temporary files + + // First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) - // Now do a second pass and load all the log and index files. + // The remaining valid swap files must come from compaction or segment split operation. We can + // simply rename them to regular segment files. But, before renaming, we should figure out which + // segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. + // We store segments that require renaming in this code block, and do the actual renaming later. + var minSwapFileOffset = Long.MaxValue + var maxSwapFileOffset = Long.MinValue + val toRenameSwapFiles = mutable.Set[File]() + swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, + baseOffset = baseOffset, + params.config, + time = params.time, + fileSuffix = Log.SwapFileSuffix) + toRenameSwapFiles += f + info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") + minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) + maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) Review comment: segment.offsetIndex.lastOffset doesn't give the exact last offset in a segment since the index is sparse. We need to use segment.nextOffset(). ########## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ########## @@ -1472,11 +1489,30 @@ class LogCleanerTest { time.scheduler.clear() cleanedKeys = LogTestUtils.keysInLog(log) - // 3) Simulate recovery after swap file is created and old segments files are renamed + // 4) Simulate recovery after swap file is created and old segments files are renamed // to .deleted. Clean operation is resumed during recovery. log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) log = recoverAndCheck(config, cleanedKeys) + // add some more messages and clean the log again + while (log.numberOfSegments < 10) { + log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0) + messageCount += 1 + } + for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) + // clear scheduler so that async deletes don't run + time.scheduler.clear() + cleanedKeys = LogTestUtils.keysInLog(log) + + // 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed + // to .deleted. Clean operation is resumed during recovery. + log.logSegments.head.timeIndex.file.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", Log.SwapFileSuffix))) + // .changeFileSuffixes("", Log.SwapFileSuffix) Review comment: Is this still needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org