junrao commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r879690436


##########
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##########
@@ -134,10 +154,29 @@ class LogLoaderTest {
       }
     }
 
+    def initializeLogManagerForSimulatingErrorTest(hasError: Boolean = false,
+                                                 errorType: ErrorTypes.Errors 
= null,
+                                                 logDirFailureChannel: 
LogDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+                                                ): (LogManager, Executable) = {
+      simulateError.hasError = hasError

Review Comment:
   We are changing simulateError after initializing LogManager. Perhaps it's 
clearer to just do that for the first test and avoid passing in hasError and 
errorType to initializeLogManagerForSimulatingErrorTest()?



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -376,7 +381,9 @@ class LogManager(logDirs: Seq[File],
                 s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
             } catch {
               case e: IOException =>
-                offlineDirs.add((logDirAbsolutePath, e))
+                handleIOException(logDirAbsolutePath, e)
+              case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
+                // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache

Review Comment:
   When converting IOException to KafkaStorageException, we already add an 
error log. So, there is probably no need to log it below again. Could we also 
add a comment why we can just let go of KafkaStorageException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to