[ 
https://issues.apache.org/jira/browse/KAFKA-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517907#comment-16517907
 ] 

ASF GitHub Bot commented on KAFKA-6697:
---------------------------------------

ijuma closed pull request #4752: KAFKA-6697; JBOD configured broker should not 
die if log directory is invalid
URL: https://github.com/apache/kafka/pull/4752
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index c0ac3b81907..3bb5ee62c68 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -144,10 +144,8 @@ class LogManager(logDirs: Seq[File],
    * </ol>
    */
   private def createAndValidateLogDirs(dirs: Seq[File], initialOfflineDirs: 
Seq[File]): ConcurrentLinkedQueue[File] = {
-    if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
-      throw new KafkaException("Duplicate log directory found: " + 
dirs.mkString(", "))
-
     val liveLogDirs = new ConcurrentLinkedQueue[File]()
+    val canonicalPaths = mutable.HashSet.empty[String]
 
     for (dir <- dirs) {
       try {
@@ -155,13 +153,21 @@ class LogManager(logDirs: Seq[File],
           throw new IOException(s"Failed to load ${dir.getAbsolutePath} during 
broker startup")
 
         if (!dir.exists) {
-          info("Log directory '" + dir.getAbsolutePath + "' not found, 
creating it.")
+          info(s"Log directory ${dir.getAbsolutePath} not found, creating it.")
           val created = dir.mkdirs()
           if (!created)
-            throw new IOException("Failed to create data directory " + 
dir.getAbsolutePath)
+            throw new IOException(s"Failed to create data directory 
${dir.getAbsolutePath}")
         }
         if (!dir.isDirectory || !dir.canRead)
-          throw new IOException(dir.getAbsolutePath + " is not a readable log 
directory.")
+          throw new IOException(s"${dir.getAbsolutePath} is not a readable log 
directory.")
+
+        // getCanonicalPath() throws IOException if a file system query fails 
or if the path is invalid (e.g. contains
+        // the Nul character). Since there's no easy way to distinguish 
between the two cases, we treat them the same
+        // and mark the log directory as offline.
+        if (!canonicalPaths.add(dir.getCanonicalPath))
+          throw new KafkaException(s"Duplicate log directory found: 
${dirs.mkString(", ")}")
+
+
         liveLogDirs.add(dir)
       } catch {
         case e: IOException =>
@@ -169,7 +175,7 @@ class LogManager(logDirs: Seq[File],
       }
     }
     if (liveLogDirs.isEmpty) {
-      fatal(s"Shutdown broker because none of the specified log dirs from " + 
dirs.mkString(", ") + " can be created or validated")
+      fatal(s"Shutdown broker because none of the specified log dirs from 
${dirs.mkString(", ")} can be created or validated")
       Exit.halt(1)
     }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index d9efc236780..3fc6c1cecd8 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -67,6 +67,29 @@ class LogManagerTest {
   @Test
   def testCreateLog() {
     val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
+    assertEquals(1, logManager.liveLogDirs.size)
+
+    val logFile = new File(logDir, name + "-0")
+    assertTrue(logFile.exists)
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
+  }
+
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log and that 
we can append to the new log.
+   * The LogManager is configured with one invalid log directory which should 
be marked as offline.
+   */
+  @Test
+  def testCreateLogWithInvalidLogDir() {
+    // Configure the log dir with the Nul character as the path, which causes 
dir.getCanonicalPath() to throw an
+    // IOException. This simulates the scenario where the disk is not properly 
mounted (which is hard to achieve in
+    // a unit test)
+    val dirs = Seq(logDir, new File("\u0000"))
+
+    logManager.shutdown()
+    logManager = createLogManager(dirs)
+    logManager.startup()
+
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), 
logConfig, isNew = true)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> JBOD configured broker should not die if log directory is invalid
> -----------------------------------------------------------------
>
>                 Key: KAFKA-6697
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6697
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Currently JBOD configured broker will still die on startup if 
> dir.getCanonicalPath() throws IOException. We should mark such log directory 
> as offline and broker should still run if there is good disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to