[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2022-01-17 Thread Maksim Zinal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17477227#comment-17477227
 ] 

Maksim Zinal edited comment on KAFKA-1194 at 1/17/22, 1:59 PM:
---

Duplicating my comment from KAFKA-2170 here.

I've created a quick patch which seems to fix the issue on my system and on the 
Kafka version I use (2.8.0).

I believe that the reason of lock failures is the use of RandomAccessFile Java 
API to open the files in some cases, while in other cases FileChannel.open() is 
used instead. When opening files with RandomAccessFile under Windows, 
FILE_SHARE_DELETE flag is not set, which leads to "access denied" errors when 
trying to rename or delete the open files. FileChannel.open() sets the 
FILE_SHARE_DELETE by default, as I checked on JDK 8 and 11.

Here's the link to the branch based on tag 2.8.0: 
[https://github.com/zinal/kafka/tree/2.8.0_KAFKA-1194|[https://github.com/zinal/kafka/tree/2.8.0_KAFKA-1194].]

Here are the exact changes implemented: 
[https://github.com/zinal/kafka/compare/2.8.0...zinal:2.8.0_KAFKA-1194] (plus 
jcenter and grgit stuff needed to run the build).


was (Author: JIRAUSER283716):
Duplicating my comment from KAFKA-2170 here.

I've created a quick patch which seems to fix the issue on my system and on the 
Kafka version I use (2.8.0).

I believe that the reason of lock failures are related to using 
RandomAccessFile Java API to open the files in some cases, while in other cases 
FileChannel.open() is used instead. When opening files with RandomAccessFile 
under Windows, FILE_SHARE_DELETE flag is not set, which leads to "access 
denied" errors when trying to rename or delete the open files. 
FileChannel.open() sets the FILE_SHARE_DELETE by default, as I checked on JDK 8 
and 11.

Here's the link to the branch based on tag 2.8.0: 
[https://github.com/zinal/kafka/tree/2.8.0_KAFKA-1194|[https://github.com/zinal/kafka/tree/2.8.0_KAFKA-1194].]

Here are the exact changes implemented: 
[https://github.com/zinal/kafka/compare/2.8.0...zinal:2.8.0_KAFKA-1194] (plus 
jcenter and grgit stuff needed to run the build).

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, 
> image-2018-11-26-10-18-59-381.png, kafka-1194-v1.patch, kafka-1194-v2.patch, 
> kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-26 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698875#comment-16698875
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 11/26/18 1:46 PM:
-

I've created a new pull request: [https://github.com/apache/kafka/pull/5945]

[~stephane.maa...@gmail.com] - this one contains two commits.
1. Addresses the retention mechanism under windows (*works with delete policy*, 
and not compact).
2. *Support for topic deletion on windows added*.


was (Author: kobi hikri):
I've created a new pull request: [https://github.com/apache/kafka/pull/5945
]

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, 
> image-2018-11-26-10-18-59-381.png, kafka-1194-v1.patch, kafka-1194-v2.patch, 
> kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-20 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693465#comment-16693465
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 11/20/18 4:44 PM:
-

Hi all,

In the above pull request, you will find a *partial fix* to the windows 
retention mechanism.
 Given a certain configuration, it is a workaround which allows using Kafka's 
retention mechanism on Windows.
 Please note - this fix *WILL NOT* work with *log.cleanup.policy = compact*
 Please share your thoughts. I believe with this fix, we can *close the 
critical bug* (as it has a workaround), and *open a lower severity bug* for 
utilizing the *log compaction* mechanism on Windows.

Please DO NOT MERGE until this is discussed.

[~stephane.maa...@gmail.com] - Could you kindly share your thoughts?



Any comments are welcome,
 Kobi. 


was (Author: kobi hikri):
Hi all,

In the above pull request, you will find a *partial fix* to the windows 
retention mechanism.
Given a certain configuration, it is a workaround which allows using Kafka's 
retention mechanism on Windows.
Please note - this fix *WILL NOT* work with *log.cleanup.policy = compact*
Please share your thoughts. I believe with this fix, we can *close the critical 
bug* (as it has a workaround), and *open a lower severity bug* for utilizing 
the *log compaction* mechanism on Windows.

Any comments are welcome,
Kobi. 

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688085#comment-16688085
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 11/15/18 1:48 PM:
-

[~stephane.maa...@gmail.com], here it is:

[https://github.com/apache/kafka/pull/5917]

Thanks in advance,

Kobi.


was (Author: kobi hikri):
[~stephane.maa...@gmail.com], here it is: 
[https://github.com/apache/kafka/pull/5917

T|https://github.com/apache/kafka/pull/5917]hanks in advance,
Kobi.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-24 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625600#comment-16625600
 ] 

M. Manna edited comment on KAFKA-1194 at 9/24/18 10:11 AM:
---

The best thing to do is to check the following:

1) Set the default value of log.segment.bytes to something slightly smaller 
e.g. 20 MB, 
 2) Disable log.cleaner.enable (i.e. false) - This should ensure that you don't 
have Log.scala segments opened accidentally by the brokers. In fact, once the 
cleaner is disabled, and the segments are rolled over - we should be able to 
delete them manually.
 3) Allow rollover of new log files.
 4) Now manually delete the files using some scripts.

This is still not something I would recommend, but if Windows Operation is a 
MUST (without using containers), you want to try this out.

Regards,


was (Author: manme...@gmail.com):
The best thing to do is to check the following:

1) Set the default value of log.segment.bytes to something slightly smaller 
e.g. 20 MB, 
2) Disable log.cleaner.enable (i.e. false)
3) Allow rollover of new log files.
4) Now manually delete the files using some scripts.

This is still not something I would recommend, but if Windows Operation is a 
MUST (without using containers), you want to try this out.

Regards,

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612028#comment-16612028
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 9/12/18 4:25 PM:


Thanks for your time [~stephane.maa...@gmail.com].
  
 I am adding some logs to the method and reproducing again.


was (Author: kobi hikri):
Thanks for your time [~stephane.maa...@gmail.com].
  
I am adding some logs to the method and reproducing again.

public static void atomicMoveWithFallback(Path source, Path target) throws 
IOException {
 try {
 log.debug("Attempting atomic move of {} to {}", source, target);
 Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
 log.debug("Atomic move of {} to {} succeeded", source, target);
 } catch (IOException outer) {
 try {
 log.debug("Attempting Non-atomic move of {} to {} succeeded after atomic move 
failed due to {}", source, target,
 outer.getMessage());
 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
 log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due 
to {}", source, target,
 outer.getMessage());
 } catch (IOException inner) {
 log.error("Non-atomic move of {} to {} failed due to {}", source, target, 
inner.getMessage());
 inner.addSuppressed(outer);
 throw inner;
 }
 }
 }

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612028#comment-16612028
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 9/12/18 12:17 PM:
-

Thanks for your time [~stephane.maa...@gmail.com].
  
I am adding some logs to the method and reproducing again.

public static void atomicMoveWithFallback(Path source, Path target) throws 
IOException {
 try {
 log.debug("Attempting atomic move of {} to {}", source, target);
 Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
 log.debug("Atomic move of {} to {} succeeded", source, target);
 } catch (IOException outer) {
 try {
 log.debug("Attempting Non-atomic move of {} to {} succeeded after atomic move 
failed due to {}", source, target,
 outer.getMessage());
 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
 log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due 
to {}", source, target,
 outer.getMessage());
 } catch (IOException inner) {
 log.error("Non-atomic move of {} to {} failed due to {}", source, target, 
inner.getMessage());
 inner.addSuppressed(outer);
 throw inner;
 }
 }
 }


was (Author: kobi hikri):
Thanks for your time [~stephane.maa...@gmail.com].
 

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611916#comment-16611916
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 9/12/18 11:27 AM:
-

Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached here: [^RetentionExpiredWindows.txt]

A snapshot of the log file directory is also attached. Please notice that the 
latest log files were *correctly marked as deleted*.

TL;DR:
 # The broker *no longer shuts-down*
 # Files marked for deletion are still not deleted. Few are deleted, but not 
all.
 # Plenty of "ERROR" log lines (as expected :)).



!image-2018-09-12-14-25-52-632.png!


was (Author: kobi hikri):
Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached here: [^RetentionExpiredWindows.txt]

TL;DR:

1. The broker *no longer shuts-down*
 2. Files marked for deletion are still not deleted. Few are deleted, but not 
all.
 3. Plenty of "ERROR" log lines (as expected :)).

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611916#comment-16611916
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 9/12/18 11:05 AM:
-

Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached here: [^RetentionExpiredWindows.txt]

TL;DR:

1. The broker *no longer shuts-down*
 2. Files marked for deletion are still not deleted. Few are deleted, but not 
all.
 3. Plenty of "ERROR" log lines (as expected :)).


was (Author: kobi hikri):
Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached below.

TL;DR:

1. The broker *no longer shuts-down*
2. Files marked for deletion are still not deleted. Few are deleted, but not 
all.
3. Plenty of "ERROR" log lines (as expected :)).

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, kafka-1194-v1.patch, kafka-1194-v2.patch, kafka-bombarder.7z, 
> screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2017-07-09 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079678#comment-16079678
 ] 

M. Manna edited comment on KAFKA-1194 at 7/9/17 6:39 PM:
-

I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("timeindex", e)
}
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted 
(kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this 
doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can 
try this out for future release? 


was (Author: manme...@gmail.com):
I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2017-07-09 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079678#comment-16079678
 ] 

M. Manna edited comment on KAFKA-1194 at 7/9/17 6:24 PM:
-

I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("timeindex", e)
}
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted 
(kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this 
doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can 
try this out for future release? 


was (Author: manme...@gmail.com):
I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new