junrao commented on a change in pull request #9364: URL: https://github.com/apache/kafka/pull/9364#discussion_r510527544
########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -298,26 +300,38 @@ class LogManager(logDirs: Seq[File], /** * Recover and load all logs in the given data directories */ - private def loadLogs(): Unit = { + private[log] def loadLogs(): Unit = { info(s"Loading logs from log dirs $liveLogDirs") val startMs = time.hiResClockMs() val threadPools = ArrayBuffer.empty[ExecutorService] val offlineDirs = mutable.Set.empty[(String, IOException)] - val jobs = mutable.Map.empty[File, Seq[Future[_]]] + val jobs = ArrayBuffer.empty[Seq[Future[_]]] var numTotalLogs = 0 for (dir <- liveLogDirs) { val logDirAbsolutePath = dir.getAbsolutePath + var hadCleanShutdown: Boolean = false try { val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) threadPools.append(pool) val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) if (cleanShutdownFile.exists) { info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found") + // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile + // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471 + try { + cleanShutdownFile.delete() + } catch { + case e: IOException => Review comment: This is an existing issue, but cleanShutdownFile.delete() doesn't seem to throw IOException. ########## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ########## @@ -1257,7 +1328,7 @@ class LogTest { log.close() // After reloading log, producer state should not be regenerated - val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) + val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L, lastShutdownClean = false) Review comment: Hmm, it seems that this tests expects a clean shutdown. ########## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ########## @@ -2131,12 +2202,12 @@ class LogTest { assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries) } - log = createLog(logDir, logConfig, recoveryPoint = lastOffset) + log = createLog(logDir, logConfig, recoveryPoint = lastOffset, lastShutdownClean = false) Review comment: It seems that this expects a clean shutdown. ########## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ########## @@ -4445,11 +4504,9 @@ class LogTest { (log, segmentWithOverflow) } - private def recoverAndCheck(config: LogConfig, - expectedKeys: Iterable[Long], - expectDeletedFiles: Boolean = true): Log = { - LogTest.recoverAndCheck(logDir, config, expectedKeys, brokerTopicStats, mockTime, mockTime.scheduler, - expectDeletedFiles) + private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long], expectDeletedFiles: Boolean = true) = { Review comment: This is an existing problem. Since no callers are explicitly setting expectDeletedFiles, could we just remove this param? ########## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ########## @@ -3623,7 +3690,7 @@ class LogTest { log.close() // reopen the log and recover from the beginning - val recoveredLog = createLog(logDir, LogConfig()) + val recoveredLog = createLog(logDir, LogConfig(), lastShutdownClean = false) Review comment: It seem the createLog() call on line 3976 inside testRecoverOnlyLastSegment() needs to have lastShutdownClean = false. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org