[jira] [Commented] (KAFKA-5574) kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode
[ https://issues.apache.org/jira/browse/KAFKA-5574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079892#comment-16079892 ] ASF GitHub Bot commented on KAFKA-5574: --- GitHub user cnZach opened a pull request: https://github.com/apache/kafka/pull/3512 KAFKA-5574: add thread.id header in show-detailed-stats report kafka-consumer-perf-test.sh report missing one header column: time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500. 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500. there's one more column between "time" and "data.consumed.in.MB", should be thread.id . You can merge this pull request into a Git repository by running: $ git pull https://github.com/cnZach/kafka trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3512.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3512 commit 7e2d8a99fe4318abf63c0fdc34f9c69824948127 Author: Yuexin ZhangDate: 2017-07-10T05:36:40Z KAFKA-5574; add thread.id header in show-detailed-stats report > kafka-consumer-perf-test.sh report header has one less column in > show-detailed-stats mode > - > > Key: KAFKA-5574 > URL: https://issues.apache.org/jira/browse/KAFKA-5574 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.9.0.0, 0.10.0.0 >Reporter: Yuexin Zhang > > time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec > 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982 > 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500. > 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500. > there's one more column between "time" and "data.consumed.in.MB", it's > currently set to 0: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158 > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L175 > is it a thread id? what is this id used for? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5574) kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode
[ https://issues.apache.org/jira/browse/KAFKA-5574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuexin Zhang updated KAFKA-5574: Description: time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500. 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500. there's one more column between "time" and "data.consumed.in.MB", it's currently set to 0: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L175 is it a thread id? what is this id used for? was: time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500. 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500. there's one more column between "time" and "data.consumed.in.MB", it's currently set to 0: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158 is it a thread id? what is this id used for? > kafka-consumer-perf-test.sh report header has one less column in > show-detailed-stats mode > - > > Key: KAFKA-5574 > URL: https://issues.apache.org/jira/browse/KAFKA-5574 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.9.0.0, 0.10.0.0 >Reporter: Yuexin Zhang > > time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec > 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982 > 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500. > 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500. > there's one more column between "time" and "data.consumed.in.MB", it's > currently set to 0: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158 > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L175 > is it a thread id? what is this id used for? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5574) kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode
[ https://issues.apache.org/jira/browse/KAFKA-5574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuexin Zhang updated KAFKA-5574: Affects Version/s: 0.10.0.0 > kafka-consumer-perf-test.sh report header has one less column in > show-detailed-stats mode > - > > Key: KAFKA-5574 > URL: https://issues.apache.org/jira/browse/KAFKA-5574 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.9.0.0, 0.10.0.0 >Reporter: Yuexin Zhang > > time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec > 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982 > 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500. > 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500. > there's one more column between "time" and "data.consumed.in.MB", it's > currently set to 0: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158 > is it a thread id? what is this id used for? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5574) kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode
Yuexin Zhang created KAFKA-5574: --- Summary: kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode Key: KAFKA-5574 URL: https://issues.apache.org/jira/browse/KAFKA-5574 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.9.0.0 Reporter: Yuexin Zhang time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500. 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500. there's one more column between "time" and "data.consumed.in.MB", it's currently set to 0: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158 is it a thread id? what is this id used for? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079842#comment-16079842 ] Yogesh BG commented on KAFKA-5545: -- Sorry its session timeout. session.timeout.ms. we have set it to 30. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079678#comment-16079678 ] M. Manna edited comment on KAFKA-1194 at 7/9/17 6:39 PM: - I believe I have found a workaround (and possibly a solution). The Root cause is probably on Windows FILE_SHARE_DELETE (using some internal low level API call) is always set to false (or simply hasn't been defined). This is possibly failing the Files.move(). Perhaps future JDKs will consider this to be configurable somehow!. Meanwhile, i have done the following: 1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala): {code:java} def forceUnmapWrapper() { if (Os.isWindows) forceUnmap(mmap) } {code} 2) In Log.scala made the following changes for changeFileSuffixes: {code:java} def changeFileSuffixes(oldSuffix: String, newSuffix: String) { def kafkaStorageException(fileType: String, e: IOException) = new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e) logger.warn("KAFKA mod - starting log renameTo op"); try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("log", e) } logger.warn("KAFKA mod - starting index renameTo op") index.forceUnmapWrapper try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("index", e) } logger.warn("KAFKA mod - starting timeIndex renameTo op") timeIndex.forceUnmapWrapper try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("timeindex", e) } } {code} Produces the following output upon startup on my 3 brokers: {code:java} [2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,460] INFO Deleting index C:\tmp\kafka-logs3\z1-1\.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,470] INFO Deleting index C:\tmp\kafka-logs3\z1-1\.timeindex.deleted (kafka.log.TimeIndex) [2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,501] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00030240.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,507] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted (kafka.log.TimeIndex) [2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,520] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00047520.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,523] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted (kafka.log.TimeIndex) {code} My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this doesn't always get triggered as expected but when it does - it now cleans. if someone is kind enough to verify this solution and propose a commit - we can try this out for future release? was (Author: manme...@gmail.com): I believe I have found a workaround (and possibly a solution). The Root cause is probably on Windows FILE_SHARE_DELETE (using some internal low level API call) is always set to false (or simply hasn't been defined). This is possibly failing the Files.move(). Perhaps future JDKs will consider this to be configurable somehow!. Meanwhile, i have done the following: 1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala): {code:java} def forceUnmapWrapper() { if (Os.isWindows) forceUnmap(mmap) } {code} 2) In Log.scala made the following changes for changeFileSuffixes: {code:java} def changeFileSuffixes(oldSuffix: String, newSuffix: String) { def kafkaStorageException(fileType: String, e: IOException) = new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e) logger.warn("KAFKA mod - starting log renameTo op"); try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("log", e) } logger.warn("KAFKA mod - starting index renameTo op") index.forceUnmapWrapper try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("index", e) } logger.warn("KAFKA mod - starting timeIndex renameTo op") timeIndex.forceUnmapWrapper try timeIndex.renameTo(new
[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079678#comment-16079678 ] M. Manna edited comment on KAFKA-1194 at 7/9/17 6:24 PM: - I believe I have found a workaround (and possibly a solution). The Root cause is probably on Windows FILE_SHARE_DELETE (using some internal low level API call) is always set to false (or simply hasn't been defined). This is possibly failing the Files.move(). Perhaps future JDKs will consider this to be configurable somehow!. Meanwhile, i have done the following: 1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala): {code:java} def forceUnmapWrapper() { if (Os.isWindows) forceUnmap(mmap) } {code} 2) In Log.scala made the following changes for changeFileSuffixes: {code:java} def changeFileSuffixes(oldSuffix: String, newSuffix: String) { def kafkaStorageException(fileType: String, e: IOException) = new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e) logger.warn("KAFKA mod - starting log renameTo op"); try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("log", e) } logger.warn("KAFKA mod - starting index renameTo op") index.forceUnmapWrapper try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("index", e) } logger.warn("KAFKA mod - starting timeIndex renameTo op") timeIndex.forceUnmapWrapper try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("timeindex", e) } } {code} Produces the following output upon startup on my 3 brokers: {code:java} [2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,460] INFO Deleting index C:\tmp\kafka-logs3\z1-1\.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,470] INFO Deleting index C:\tmp\kafka-logs3\z1-1\.timeindex.deleted (kafka.log.TimeIndex) [2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,501] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00030240.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,507] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted (kafka.log.TimeIndex) [2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,520] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00047520.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,523] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted (kafka.log.TimeIndex) {code} My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this doesn't always get triggered as expected but when it does - it now cleans. if someone is kind enough to verify this solution and propose a commit - we can try this out for future release? was (Author: manme...@gmail.com): I believe I have found a workaround (and possibly a solution). The Root cause is probably on Windows FILE_SHARE_DELETE (using some internal low level API call) is always set to false (or simply hasn't been defined). This is possibly failing the Files.move(). Perhaps future JDKs will consider this to be configurable somehow!. Meanwhile, i have done the following: 1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala): {code:java} def forceUnmapWrapper() { forceUnmap(mmap) } {code} 2) In Log.scala made the following changes for changeFileSuffixes: {code:java} def changeFileSuffixes(oldSuffix: String, newSuffix: String) { def kafkaStorageException(fileType: String, e: IOException) = new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e) logger.warn("KAFKA mod - starting log renameTo op"); try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("log", e) } logger.warn("KAFKA mod - starting index renameTo op") index.forceUnmapWrapper try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("index", e) } logger.warn("KAFKA mod - starting timeIndex renameTo op") timeIndex.forceUnmapWrapper try timeIndex.renameTo(new
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079678#comment-16079678 ] M. Manna commented on KAFKA-1194: - I believe I have found a workaround (and possibly a solution). The Root cause is probably on Windows FILE_SHARE_DELETE (using some internal low level API call) is always set to false (or simply hasn't been defined). This is possibly failing the Files.move(). Perhaps future JDKs will consider this to be configurable somehow!. Meanwhile, i have done the following: 1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala): {code:java} def forceUnmapWrapper() { forceUnmap(mmap) } {code} 2) In Log.scala made the following changes for changeFileSuffixes: {code:java} def changeFileSuffixes(oldSuffix: String, newSuffix: String) { def kafkaStorageException(fileType: String, e: IOException) = new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e) logger.warn("KAFKA mod - starting log renameTo op"); try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("log", e) } logger.warn("KAFKA mod - starting index renameTo op") index.forceUnmapWrapper try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("index", e) } logger.warn("KAFKA mod - starting timeIndex renameTo op") timeIndex.forceUnmapWrapper try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix))) catch { case e: IOException => throw kafkaStorageException("timeindex", e) } } {code} Produces the following output upon startup on my 3 brokers: {code:java} [2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,460] INFO Deleting index C:\tmp\kafka-logs3\z1-1\.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,470] INFO Deleting index C:\tmp\kafka-logs3\z1-1\.timeindex.deleted (kafka.log.TimeIndex) [2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,501] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00030240.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,507] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted (kafka.log.TimeIndex) [2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. (kafka.log.Log) [2017-07-09 18:42:16,520] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00047520.index.deleted (kafka.log.OffsetIndex) [2017-07-09 18:42:16,523] INFO Deleting index C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted (kafka.log.TimeIndex) {code} My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this doesn't always get triggered as expected but when it does - it now cleans. if someone is kind enough to verify this solution and propose a commit - we can try this out for future release? > The kafka broker cannot delete the old log files after the configured time > -- > > Key: KAFKA-1194 > URL: https://issues.apache.org/jira/browse/KAFKA-1194 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.1 > Environment: window >Reporter: Tao Qin >Priority: Critical > Labels: features, patch > Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, > kafka-1194-v2.patch, screenshot-1.png, Untitled.jpg > > Original Estimate: 72h > Remaining Estimate: 72h > > We tested it in windows environment, and set the log.retention.hours to 24 > hours. > # The minimum age of a log file to be eligible for deletion > log.retention.hours=24 > After several days, the kafka broker still cannot delete the old log file. > And we get the following exceptions: > [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task > 'kafka-log-retention' (kafka.utils.KafkaScheduler) > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 1516723 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at >