ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r657560081



##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -106,7 +174,17 @@ object LogLoader extends Logging {
       loadSegmentFiles(params)
     })
 
-    completeSwapOperations(swapFiles, params)
+    // Do the actual recovery for toRecoverSwapFiles, as discussed above.

Review comment:
       you are right if we don't need to do sanity checks. Removed this

##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -90,11 +90,79 @@ object LogLoader extends Logging {
    *                                           overflow index offset
    */
   def load(params: LoadLogParams): LoadedLogOffsets = {
-    // first do a pass through the files in the log directory and remove any 
temporary files
+
+    // First pass: through the files in the log directory and remove any 
temporary files
     // and find any interrupted swap operations
     val swapFiles = removeTempFilesAndCollectSwapFiles(params)
 
-    // Now do a second pass and load all the log and index files.
+    // The remaining valid swap files must come from compaction operation. We 
can simply rename them
+    // to regular segment files. But, before renaming, we should figure out 
which segments are
+    // compacted and delete these segment files: this is done by calculating 
min/maxSwapFileOffset.
+    // If sanity check fails, we cannot do the simple renaming, we must do a 
full recovery, which
+    // involves rebuilding all the index files and the producer state.
+    // We store segments that require renaming and recovery in this code 
block, and do the actual
+    // renaming and recovery later.
+    var minSwapFileOffset = Long.MaxValue
+    var maxSwapFileOffset = Long.MinValue
+    val toRenameSwapFiles = mutable.Set[File]()
+    val toRecoverSwapFiles = mutable.Set[File]()
+    swapFiles.filter(f => Log.isLogFile(new 
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
+      val baseOffset = offsetFromFile(f)
+      val segment = LogSegment.open(f.getParentFile,
+        baseOffset = baseOffset,
+        params.config,
+        time = params.time,
+        fileSuffix = Log.SwapFileSuffix)
+      try {
+        segment.sanityCheck(false)

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to