[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602005#comment-16602005
 ] 

ASF GitHub Bot commented on KAFKA-1194:
---------------------------------------

simplesteph closed pull request #5603: KAFKA-1194: Fixes for Windows
URL: https://github.com/apache/kafka/pull/5603
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index cebb5fa09d1..38d9519b2d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -438,16 +438,20 @@ private static FileChannel openChannel(File file,
                                            int initFileSize,
                                            boolean preallocate) throws 
IOException {
         if (mutable) {
-            if (fileAlreadyExists || !preallocate) {
-                return FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.READ,
-                        StandardOpenOption.WRITE);
+            if (fileAlreadyExists) {
+                return FileChannel.open(file.toPath(), 
StandardOpenOption.WRITE, StandardOpenOption.READ);
             } else {
-                RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"rw");
-                randomAccessFile.setLength(initFileSize);
-                return randomAccessFile.getChannel();
+                if (preallocate) {
+                    try (RandomAccessFile randomAccessFile = new 
RandomAccessFile(file, "rw")) {
+                        randomAccessFile.setLength(initFileSize);
+                    }
+                    return FileChannel.open(file.toPath(), 
StandardOpenOption.WRITE, StandardOpenOption.READ);
+                } else {
+                    return FileChannel.open(file.toPath(), 
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
+                }
             }
         } else {
-            return FileChannel.open(file.toPath());
+            return FileChannel.open(file.toPath(), StandardOpenOption.READ);
         }
     }
 
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index ec9d55f89ac..aff3d363972 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -202,8 +202,26 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
    * @throws IOException if rename fails
    */
   def renameTo(f: File) {
-    try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
-    finally file = f
+    maybeLock(lock) {
+      val position = if (this.mmap == null) 0 else this.mmap.position()
+      if (OperatingSystem.IS_WINDOWS && this.mmap != null)
+        forceUnmap()
+      try {
+        Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+        if (OperatingSystem.IS_WINDOWS) {
+          val raf = new RandomAccessFile(f, "rw")
+          try {
+            val len = raf.length()
+            this.mmap = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, 
len)
+            this.mmap.position(position)
+          } finally {
+            CoreUtils.swallow(raf.close(), this)
+          }
+        }
+      } finally {
+        file = f
+      }
+    }
   }
 
   /**
@@ -228,7 +246,10 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
       // However, in some cases it can pause application threads(STW) for a 
long moment reading metadata from a physical disk.
       // To prevent this, we forcefully cleanup memory mapping within proper 
execution which never affects API responsiveness.
       // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
-      safeForceUnmap()
+      CoreUtils.swallow(forceUnmap(), this)
+      // Accessing unmapped mmap crashes JVM by SEGV.
+      // Accessing it after this method called sounds like a bug but for 
safety, assign null and do not allow later access.
+      mmap = null
     }
     Files.deleteIfExists(file.toPath)
   }
@@ -251,6 +272,10 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
   /** Close the index */
   def close() {
     trimToValidSize()
+    inLock(lock) {
+      if (OperatingSystem.IS_WINDOWS)
+        forceUnmap()
+    }
   }
 
   def closeHandler(): Unit = {
@@ -316,8 +341,10 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
    * Forcefully free the buffer's mmap.
    */
   protected[log] def forceUnmap() {
-    try MappedByteBuffers.unmap(file.getAbsolutePath, mmap)
-    finally mmap = null // Accessing unmapped mmap crashes JVM by SEGV so we 
null it out to be safe
+    if (mmap != null) {
+      try MappedByteBuffers.unmap(file.getAbsolutePath, mmap)
+      finally mmap = null // Accessing unmapped mmap crashes JVM by SEGV so we 
null it out to be safe
+    }
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index afe151d69b6..d5df5e9b810 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1751,6 +1751,7 @@ class Log(@volatile var dir: File,
    */
   private def asyncDeleteSegment(segment: LogSegment) {
     segment.changeFileSuffixes("", Log.DeletedFileSuffix)
+
     def deleteSeg() {
       info(s"Deleting segment ${segment.baseOffset}")
       maybeHandleIOException(s"Error while deleting segments for 
$topicPartition in dir ${dir.getParent}") {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 32203acde9a..79ec413eb79 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -230,22 +230,27 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
-   * Lock all the given directories
-   */
+    * Lock all the given directories
+    */
   private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
-    dirs.flatMap { dir =>
+    val locks = mutable.ArrayBuffer.empty[FileLock]
+    for (dir <- dirs) {
       try {
         val lock = new FileLock(new File(dir, LockFile))
-        if (!lock.tryLock())
-          throw new KafkaException("Failed to acquire lock on file .lock in " 
+ lock.file.getParent +
+        if (!lock.tryLock()) {
+          lock.destroy()
+          locks.foreach(_.destroy())
+          throw new KafkaException("Failed to acquire lock on file .lock in " 
+ lock.file.getParentFile.getAbsolutePath +
             ". A Kafka instance in another process or thread is using this 
directory.")
-        Some(lock)
+        }
+        locks.append(lock)
       } catch {
         case e: IOException =>
+          locks.foreach(_.destroy())
           logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, 
s"Disk error while locking directory $dir", e)
-          None
       }
     }
+    locks
   }
 
   private def addLogToBeDeleted(log: Log): Unit = {
@@ -830,9 +835,11 @@ class LogManager(logDirs: Seq[File],
         cleaner.abortCleaning(topicPartition)
         cleaner.updateCheckpoints(removedLog.dir.getParentFile)
       }
-      removedLog.renameDir(Log.logDeleteDirName(topicPartition))
+
       checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
       checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
+      removedLog.close()
+      removedLog.renameDir(Log.logDeleteDirName(topicPartition))
       addLogToBeDeleted(removedLog)
       info(s"Log for partition ${removedLog.topicPartition} is renamed to 
${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
     } else if (offlineLogDirs.nonEmpty) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The kafka broker cannot delete the old log files after the configured time
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-1194
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1194
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>         Environment: window
>            Reporter: Tao Qin
>            Priority: Critical
>              Labels: features, patch, windows
>         Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>          at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>          at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>          at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>          at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>          at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>          at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>          at scala.collection.immutable.List.foreach(List.scala:76)
>          at kafka.log.Log.deleteOldSegments(Log.scala:418)
>          at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>          at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>          at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>          at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>          at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>          at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>          at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>          at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>          at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>          at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>          at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>          at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>          at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>          at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>          at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>          at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to