junrao commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r659914342



##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -90,11 +90,63 @@ object LogLoader extends Logging {
    *                                           overflow index offset
    */
   def load(params: LoadLogParams): LoadedLogOffsets = {
-    // first do a pass through the files in the log directory and remove any 
temporary files
+
+    // First pass: through the files in the log directory and remove any 
temporary files
     // and find any interrupted swap operations
     val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-    // Now do a second pass and load all the log and index files.
+    // The remaining valid swap files must come from compaction or segment 
split operation. We can
+    // simply rename them to regular segment files. But, before renaming, we 
should figure out which
+    // segments are compacted and delete these segment files: this is done by 
calculating min/maxSwapFileOffset.
+    // We store segments that require renaming in this code block, and do the 
actual renaming later.
+    var minSwapFileOffset = Long.MaxValue
+    var maxSwapFileOffset = Long.MinValue
+    val toRenameSwapFiles = mutable.Set[File]()
+    swapFiles.filter(f => Log.isLogFile(new 
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
+      val baseOffset = offsetFromFile(f)
+      val segment = LogSegment.open(f.getParentFile,
+        baseOffset = baseOffset,
+        params.config,
+        time = params.time,
+        fileSuffix = Log.SwapFileSuffix)
+      toRenameSwapFiles += f
+      info(s"${params.logIdentifier}Found log file ${f.getPath} from 
interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} 
files by renaming.")
+      minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset)
+      maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, 
maxSwapFileOffset)

Review comment:
       > I could be wrong, but I think if it is compaction, the last record 
will never be removed. The reason is that compaction always removes earlier 
records of each key, and the last record will never be an earlier one.
   > 
   > Split should be similar.
   
   It's true that we generally don't remove the last record during compaction. 
However, during a round of cleaning, we clean segments in groups and each group 
generates a single .clean file. The group is formed to make sure that offsets 
are still within 2 billion in offset gap and the .clean file won't exceed 2GB 
in size. If multiple groups are formed, it's possible that a group that's not 
the last doesn't preserve the last record.
   
   > How can we get the next segment before finishing the recovery process?
   
   We could potentially scan all .log files and sort them in offset order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to