[jira] [Commented] (KAFKA-5574) kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode

2017-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079892#comment-16079892
 ] 

ASF GitHub Bot commented on KAFKA-5574:
---

GitHub user cnZach opened a pull request:

https://github.com/apache/kafka/pull/3512

KAFKA-5574: add thread.id header in show-detailed-stats report

kafka-consumer-perf-test.sh report missing one header column: 

time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982
2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500.
2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500.
there's one more column between "time" and "data.consumed.in.MB", should be 
thread.id .

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cnZach/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3512


commit 7e2d8a99fe4318abf63c0fdc34f9c69824948127
Author: Yuexin Zhang 
Date:   2017-07-10T05:36:40Z

KAFKA-5574; add thread.id header in show-detailed-stats report




> kafka-consumer-perf-test.sh report header has one less column in 
> show-detailed-stats mode
> -
>
> Key: KAFKA-5574
> URL: https://issues.apache.org/jira/browse/KAFKA-5574
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.9.0.0, 0.10.0.0
>Reporter: Yuexin Zhang
>
> time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
> 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982
> 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500.
> 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500.
> there's one more column between "time" and "data.consumed.in.MB", it's 
> currently set to 0:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L175
> is it a thread id? what is this id used for?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5574) kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode

2017-07-09 Thread Yuexin Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuexin Zhang updated KAFKA-5574:

Description: 
time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982
2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500.
2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500.

there's one more column between "time" and "data.consumed.in.MB", it's 
currently set to 0:

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L175

is it a thread id? what is this id used for?

  was:
time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982
2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500.
2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500.

there's one more column between "time" and "data.consumed.in.MB", it's 
currently set to 0:

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158

is it a thread id? what is this id used for?


> kafka-consumer-perf-test.sh report header has one less column in 
> show-detailed-stats mode
> -
>
> Key: KAFKA-5574
> URL: https://issues.apache.org/jira/browse/KAFKA-5574
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.9.0.0, 0.10.0.0
>Reporter: Yuexin Zhang
>
> time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
> 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982
> 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500.
> 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500.
> there's one more column between "time" and "data.consumed.in.MB", it's 
> currently set to 0:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L175
> is it a thread id? what is this id used for?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5574) kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode

2017-07-09 Thread Yuexin Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuexin Zhang updated KAFKA-5574:

Affects Version/s: 0.10.0.0

> kafka-consumer-perf-test.sh report header has one less column in 
> show-detailed-stats mode
> -
>
> Key: KAFKA-5574
> URL: https://issues.apache.org/jira/browse/KAFKA-5574
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.9.0.0, 0.10.0.0
>Reporter: Yuexin Zhang
>
> time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
> 2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982
> 2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500.
> 2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500.
> there's one more column between "time" and "data.consumed.in.MB", it's 
> currently set to 0:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158
> is it a thread id? what is this id used for?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5574) kafka-consumer-perf-test.sh report header has one less column in show-detailed-stats mode

2017-07-09 Thread Yuexin Zhang (JIRA)
Yuexin Zhang created KAFKA-5574:
---

 Summary: kafka-consumer-perf-test.sh report header has one less 
column in show-detailed-stats mode
 Key: KAFKA-5574
 URL: https://issues.apache.org/jira/browse/KAFKA-5574
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.9.0.0
Reporter: Yuexin Zhang


time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2017-07-09 21:40:40:369, 0, 0.1492, 2.6176, 5000, 87719.2982
2017-07-09 21:40:40:386, 0, 0.2983, 149.0479, 1, 500.
2017-07-09 21:40:40:387, 0, 0.4473, 149.0812, 15000, 500.

there's one more column between "time" and "data.consumed.in.MB", it's 
currently set to 0:

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala#L158

is it a thread id? what is this id used for?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-09 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079842#comment-16079842
 ] 

Yogesh BG commented on KAFKA-5545:
--

Sorry its session timeout. session.timeout.ms. we have set it to 30.

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2017-07-09 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079678#comment-16079678
 ] 

M. Manna edited comment on KAFKA-1194 at 7/9/17 6:39 PM:
-

I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("timeindex", e)
}
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted 
(kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this 
doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can 
try this out for future release? 


was (Author: manme...@gmail.com):
I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2017-07-09 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079678#comment-16079678
 ] 

M. Manna edited comment on KAFKA-1194 at 7/9/17 6:24 PM:
-

I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
if (Os.isWindows)
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("timeindex", e)
}
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted 
(kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this 
doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can 
try this out for future release? 


was (Author: manme...@gmail.com):
I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2017-07-09 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079678#comment-16079678
 ] 

M. Manna commented on KAFKA-1194:
-

I believe I have found a workaround (and possibly a solution). The Root cause 
is probably on Windows FILE_SHARE_DELETE (using some internal low level API 
call) is always set to false (or simply hasn't been defined). This is possibly 
failing the Files.move(). Perhaps future JDKs will consider this to be 
configurable somehow!.

Meanwhile, i have done the following:

1) I created a wrapper around forceUnmap as follows (in AbstractIndex.scala):

{code:java}

def forceUnmapWrapper() {
forceUnmap(mmap)
}
{code}

2) In Log.scala made the following changes for changeFileSuffixes:


{code:java}
  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {

def kafkaStorageException(fileType: String, e: IOException) =
  new KafkaStorageException(s"Failed to change the $fileType file suffix 
from $oldSuffix to $newSuffix for log segment $baseOffset", e)
logger.warn("KAFKA mod - starting log renameTo op");
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("log", e)
}
logger.warn("KAFKA mod - starting index renameTo op")
index.forceUnmapWrapper
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, 
oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("index", e)
}
logger.warn("KAFKA mod - starting timeIndex renameTo op")
timeIndex.forceUnmapWrapper
try timeIndex.renameTo(new 
File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
  case e: IOException => throw kafkaStorageException("timeindex", e)
}
  }
{code}

Produces the following output upon startup on my 3 brokers:

{code:java}
[2017-07-09 18:42:16,451] INFO Deleting segment 0 from log z1-1. (kafka.log.Log)
[2017-07-09 18:42:16,460] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,470] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,484] INFO Deleting segment 30240 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,501] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,507] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00030240.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-07-09 18:42:16,517] INFO Deleting segment 47520 from log z1-1. 
(kafka.log.Log)
[2017-07-09 18:42:16,520] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.index.deleted 
(kafka.log.OffsetIndex)
[2017-07-09 18:42:16,523] INFO Deleting index 
C:\tmp\kafka-logs3\z1-1\00047520.timeindex.deleted 
(kafka.log.TimeIndex)
{code}

My log.retention.minutes=10 and log.retention.check.interval.ms=30 - this 
doesn't always get triggered as expected but when it does - it now cleans.

if someone is kind enough to verify this solution and propose a commit - we can 
try this out for future release? 

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch
> Attachments: KAFKA-1194.patch, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png, Untitled.jpg
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
>