[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612012#comment-16612012
 ] 

ASF GitHub Bot commented on KAFKA-1194:
---------------------------------------

simplesteph closed pull request #5604: DO-NOT-MERGE KAFKA-1194: Alternative fix
URL: https://github.com/apache/kafka/pull/5604
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index ec9d55f89ac..9f1be4ac41a 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -202,8 +202,26 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
    * @throws IOException if rename fails
    */
   def renameTo(f: File) {
-    try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
-    finally file = f
+    if (!OperatingSystem.IS_WINDOWS) {
+      try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+      finally file = f
+    } else {
+      // we get the file's position
+      val position = if (this.mmap == null) 0 else this.mmap.position()
+      if (OperatingSystem.IS_WINDOWS && this.mmap != null)
+        // this sets mmap = null
+        safeForceUnmap()
+      try {
+        Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+        // we re-initialize mmap
+        val raf = new RandomAccessFile(f, "rw")
+        val len = raf.length()
+        this.mmap = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+        this.mmap.position(position)
+        CoreUtils.swallow(raf.close(), this)
+      }
+      finally file = f
+    }
   }
 
   /**
@@ -228,7 +246,7 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
       // However, in some cases it can pause application threads(STW) for a 
long moment reading metadata from a physical disk.
       // To prevent this, we forcefully cleanup memory mapping within proper 
execution which never affects API responsiveness.
       // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
-      safeForceUnmap()
+      if (mmap != null) safeForceUnmap()
     }
     Files.deleteIfExists(file.toPath)
   }
@@ -251,6 +269,7 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
   /** Close the index */
   def close() {
     trimToValidSize()
+    closeHandler()
   }
 
   def closeHandler(): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 32203acde9a..45cacd637f3 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -830,6 +830,7 @@ class LogManager(logDirs: Seq[File],
         cleaner.abortCleaning(topicPartition)
         cleaner.updateCheckpoints(removedLog.dir.getParentFile)
       }
+      removedLog.close()
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
       checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
       checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The kafka broker cannot delete the old log files after the configured time
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-1194
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1194
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>         Environment: window
>            Reporter: Tao Qin
>            Priority: Critical
>              Labels: features, patch, windows
>         Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>          at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>          at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>          at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>          at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>          at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>          at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>          at scala.collection.immutable.List.foreach(List.scala:76)
>          at kafka.log.Log.deleteOldSegments(Log.scala:418)
>          at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>          at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>          at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>          at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>          at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>          at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>          at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>          at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>          at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>          at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>          at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>          at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>          at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>          at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>          at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>          at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to