[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688060#comment-16688060
 ] 

Kobi Hikri commented on KAFKA-1194:
---

[~stephane.maa...@gmail.com], Thanks. Doing it now and will update.
[~manme...@gmail.com] - yes. However, unfortunately, this is not an option for 
a specific client.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-15 Thread Dave Waters (JIRA)
Dave Waters created KAFKA-7632:
--

 Summary: Add option to kafka broker config to adjust compression 
level for zstd
 Key: KAFKA-7632
 URL: https://issues.apache.org/jira/browse/KAFKA-7632
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.1.0
 Environment: all
Reporter: Dave Waters
 Fix For: 2.1.0


The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression.  Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688044#comment-16688044
 ] 

Stephane Maarek commented on KAFKA-1194:


[~Kobi Hikri] excellent, I'll be happy to help out and test your PR.
Here's the process:
1) go to github.com/apache/kafka
2) create a fork (top right corner). I'll assume your username is "user"
3) you now have a copy of the repo at github.com/user/kafka
4) add a remote to your git: git remote add myfork 
g...@gitlab.com:user/kafka.git
5) push your branch to your remote: git push -u myfork mybranch
6) go back to the web interface at github.com/apache/kafka
7) you should see a yellow bar to help you do a pull request. Else follow this 
guide: https://help.github.com/articles/creating-a-pull-request/

Hope that helps!
Stephane

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688174#comment-16688174
 ] 

Kobi Hikri commented on KAFKA-1194:
---

[~stephane.maa...@gmail.com], I was too quick to draw.
The fix I've implemented does seem to help but it leads me to a similar problem 
with managing the consumer offsets files.
Let me try and consolidate a fix with a solution for both problems.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688177#comment-16688177
 ] 

Kobi Hikri commented on KAFKA-1194:
---

Absolutely!

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-15 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688332#comment-16688332
 ] 

Boyang Chen edited comment on KAFKA-7610 at 11/15/18 4:44 PM:
--

Hey [~enether] great summary! I'm also in favor of the idea for setting an 
upper limit by `group.max.size`. This is so far the most intuitive way for end 
user and has minimum changes needed. From my understanding, it is better to be 
set on the client side and advised to be set 10X ~ 100X of current membership 
size for end user. The default value could be further discussed when we talk 
about the memory limit we want to hold, because the member metadata size should 
be stable. 

[~hachikuji] [~guozhang] Thoughts?

 


was (Author: bchen225242):
Hey [~enether] great summary! I'm also in favor of the idea for setting an 
upper limit by `group.max.size`. This is so far the most intuitive way for end 
and has minimum changes needed. From my understanding, it is better to be set 
on the client side and advised to be set 10X ~ 100X of current membership size 
for end user. The default value could be further discussed when we talk about 
the memory limit we want to hold, because the member metadata size should be 
stable. 

[~hachikuji] [~guozhang] Thoughts?

 

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2018-11-15 Thread Attila Sasvari (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Sasvari reassigned KAFKA-7631:
-

Assignee: Attila Sasvari

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> ---
>
> Key: KAFKA-7631
> URL: https://issues.apache.org/jira/browse/KAFKA-7631
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.0.0
>Reporter: Andras Beni
>Assignee: Attila Sasvari
>Priority: Minor
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688085#comment-16688085
 ] 

Kobi Hikri commented on KAFKA-1194:
---

[~stephane.maa...@gmail.com], here it is: 
[https://github.com/apache/kafka/pull/5917

T|https://github.com/apache/kafka/pull/5917]hanks in advance,
Kobi.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688085#comment-16688085
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 11/15/18 1:48 PM:
-

[~stephane.maa...@gmail.com], here it is:

[https://github.com/apache/kafka/pull/5917]

Thanks in advance,

Kobi.


was (Author: kobi hikri):
[~stephane.maa...@gmail.com], here it is: 
[https://github.com/apache/kafka/pull/5917

T|https://github.com/apache/kafka/pull/5917]hanks in advance,
Kobi.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7608) A Kafka Streams DSL transform or process call should potentially trigger a repartition

2018-11-15 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688333#comment-16688333
 ] 

Bill Bejeck commented on KAFKA-7608:


Hi Andy,

The fact that transform, transformValues, and process don't trigger a 
repartition is by design.  I'm not sure we should implement an automatic 
repartition when using a state store with these operations.  For one, we'll 
force a repartition on others already using state stores and don't require a 
repartitioning.  Second, the state store in use could be a custom store that is 
not a key-value store so repartitioning would not help.

But I completely understand your issue. 

There is an existing KIP proposal 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Repartition+Topic+Hints+in+Streams)]
 that looks to add some user control over repartitioning topics.  If we were to 
add some additional features to this KIP that
 # Allowed for Kafka Streams to create the topic via a {{KStream.through(String 
name,Produced produced)}} method
 # With the appropriate information contained in the {{produced}} parameter 
allow Kafka Streams to manage the topic created from the {{through}} call as 
repartition topic (meaning contents purged)

Would that suit your needs?

While the repartitioning would not be automatic, it would be a simple matter of 
adding a single method call to the DSL and Kafka Streams would handle topic 
creation and management of the contents.

 

Thanks,

Bill

 

> A Kafka Streams DSL transform or process call should potentially trigger a 
> repartition
> --
>
> Key: KAFKA-7608
> URL: https://issues.apache.org/jira/browse/KAFKA-7608
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Andy Bryant
>Priority: Major
>
> Currently in Kafka Streams, if any DSL operation occurs that may modify the 
> keys of the record stream, the stream is flagged for repartitioning. 
> Currently this flag is checked prior to a stream join or an aggregation and 
> if set the stream is piped through a transient repartition topic. This 
> ensures messages with the same key are always co-located in the same 
> partition and hence same stream task and state store.
> The same mechanism should be used to trigger repartitioning prior to stream 
> {{transform}}, {{transformValues}} and {{process}} calls that specify one or 
> more state stores.
> Currently without the forced repartitioning, for streams where the key has 
> been modified, there is no guarantee the same keys will be processed by the 
> same task which would be what you expect when using a state store. Given that 
> aggregations and joins already automatically make this guarantee it seems 
> inconsistent that {{transform}} and {{process}} do not provide the same 
> guarantees.
> To achieve the same guarantees currently, developers must manually pipe the 
> stream through a topic to force the repartitioning. This works, but is 
> sub-optimal since you don't get the handy optimisation where the repartition 
> topic contents is purged after use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7584) StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688677#comment-16688677
 ] 

ASF GitHub Bot commented on KAFKA-7584:
---

guozhangwang closed pull request #5874: KAFKA-7584: StreamsConfig throws 
ClassCastException if max.in.flight.request.per.connect is specified as String
URL: https://github.com/apache/kafka/pull/5874
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b25894c2e04..90523d22bce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -887,10 +887,30 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map 5) {
+throw new 
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
maxInFlightRequestsAsInteger, "Can't exceed 5 when exactly-once processing is 
enabled");
+}
+}
 }
+
 for (final String config: nonConfigurableConfigs) {
 if (clientProvidedProps.containsKey(config)) {
 final String eosMessage =  PROCESSING_GUARANTEE_CONFIG + " is 
set to " + EXACTLY_ONCE + ". Hence, ";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a86c38946df..7b34615cb39 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -123,7 +123,7 @@ public void 
consumerConfigMustContainStreamPartitionAssignorConfig() {
 assertEquals(StreamsPartitionAssignor.class.getName(), 
returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
 assertEquals(7L, 
returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
 assertEquals("dummy:host", 
returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
-assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG));
+assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG));
 assertEquals(5, 
returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
 assertEquals(100, 
returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
 }
@@ -233,7 +233,6 @@ public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
 assertEquals("host", producerConfigs.get("interceptor.statsd.host"));
 }
 
-
 @Test
 public void shouldSupportPrefixedProducerConfigs() {
 props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
@@ -427,7 +426,7 @@ public void 
testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
 public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
 final StreamsConfig streamsConfig = new StreamsConfig(props);
 final Map consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
-assertThat(consumerConfigs.get("internal.leave.group.on.close"), 
CoreMatchers.equalTo(false));
+assertThat(consumerConfigs.get("internal.leave.group.on.close"), 
CoreMatchers.equalTo(false));
 }
 
 @Test
@@ -582,15 +581,36 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() {
 }
 
 @Test
-public void 
shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() {
-props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
+public void 
shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled() {
 props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
 final StreamsConfig streamsConfig = new StreamsConfig(props);
 try {
 streamsConfig.getProducerConfigs("clientId");
-fail("Should throw ConfigException when Eos is enabled and 
maxInFlight requests exceeds 5");
+fail("Should throw ConfigException when ESO is enabled and 
maxInFlight requests exceeds 5");
+} catch (final ConfigException e) {
+assertEquals("Invalid value 7 for configuration 
max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once 
processing is enabled", e.getMessage());
+}
+}
+
+@Test
+public void 
shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled() {
+props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);

[jira] [Updated] (KAFKA-7633) Kafka Connect requires permission to create internal topics even if they exist

2018-11-15 Thread Arabelle Hou (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arabelle Hou updated KAFKA-7633:

Description: 
Similar to issue https://issues.apache.org/jira/browse/KAFKA-6250 but now with 
a different exception.

Error is:
{code:java}
018-11-13 16:14:10,283 [DistributedHerder] ERROR o.a.k.c.r.d.DistributedHerder 
- Uncaught exception in herder work thread, exiting:
org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
create/find topic(s) 'connect-offsets'
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:100)
    at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:110)
    at org.apache.kafka.connect.runtime.Worker.start(Worker.java:144)
    at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:108)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:211)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
access topics: [Topic authorization failed.]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
    ... 11 common frames omitted
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not 
authorized to access topics: [Topic authorization failed.]
{code}
Kafka 2.0 uses TOPIC_AUTHORIZATION_FAILED(29) as a response code now in the 
[CreateTopicsResponse|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]
 class whereas it used CLUSTER_AUTHORIZATION_FAILED(31) in Kafka 1.1 [for 
example|https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]

  was:
Similar to issue https://issues.apache.org/jira/browse/KAFKA-6250 but now with 
a different exception.

Error is:
{code:java}
018-11-13 16:14:10,283 [DistributedHerder] ERROR o.a.k.c.r.d.DistributedHerder 
- Uncaught exception in herder work thread, exiting:
org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
create/find topic(s) 'connect-offsets'
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:100)
    at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:110)
    at org.apache.kafka.connect.runtime.Worker.start(Worker.java:144)
    at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:108)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:211)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
access topics: [Topic authorization failed.]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
    at 

[jira] [Updated] (KAFKA-7633) Kafka Connect requires permission to create internal topics even if they exist

2018-11-15 Thread Arabelle Hou (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arabelle Hou updated KAFKA-7633:

Description: 
Similar to issue https://issues.apache.org/jira/browse/KAFKA-6250 but now with 
a different exception.

Error is:
{code:java}
018-11-13 16:14:10,283 [DistributedHerder] ERROR o.a.k.c.r.d.DistributedHerder 
- Uncaught exception in herder work thread, exiting:
org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
create/find topic(s) 'connect-offsets'
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:100)
    at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:110)
    at org.apache.kafka.connect.runtime.Worker.start(Worker.java:144)
    at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:108)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:211)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
access topics: [Topic authorization failed.]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
    ... 11 common frames omitted
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not 
authorized to access topics: [Topic authorization failed.]
{code}
Kafka 2.0 uses TOPIC_AUTHORIZATION_FAILED(29) as a response code now in the 
[CreateTopicsResponse|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]
 class whereas it used CLUSTER_AUTHORIZATION_FAILED(31) in Kafka 1.1 [for 
example|[https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]]

  was:
Similar to issue https://issues.apache.org/jira/browse/KAFKA-6250 but now with 
a different exception.

Error is:
{code:java}
018-11-13 16:14:10,283 [DistributedHerder] ERROR o.a.k.c.r.d.DistributedHerder 
- Uncaught exception in herder work thread, exiting:
org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
create/find topic(s) 'connect-offsets'
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:100)
    at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:110)
    at org.apache.kafka.connect.runtime.Worker.start(Worker.java:144)
    at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:108)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:211)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
access topics: [Topic authorization failed.]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
    at 

[jira] [Updated] (KAFKA-7584) StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String

2018-11-15 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7584:
-
Fix Version/s: 2.2.0

> StreamsConfig throws ClassCastException if max.in.flight.request.per.connect 
> is specified as String
> ---
>
> Key: KAFKA-7584
> URL: https://issues.apache.org/jira/browse/KAFKA-7584
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> Setting 
> {quote}{{props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);}}
> {{props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");}}
> {quote}
> results in
> {quote}{{java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer}}{{at 
> org.apache.kafka.streams.StreamsConfig.checkIfUnexpectedUserSpecifiedConsumerConfig(StreamsConfig.java:875)}}
> {{ at 
> org.apache.kafka.streams.StreamsConfig.getProducerConfigs(StreamsConfig.java:1071)}}
> {quote}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7633) Kafka Connect requires permission to create internal topics even if they exist

2018-11-15 Thread Arabelle Hou (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arabelle Hou updated KAFKA-7633:

Description: 
Similar to issue https://issues.apache.org/jira/browse/KAFKA-6250 but now with 
a different exception.

Error is:
{code:java}
018-11-13 16:14:10,283 [DistributedHerder] ERROR o.a.k.c.r.d.DistributedHerder 
- Uncaught exception in herder work thread, exiting:
org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
create/find topic(s) 'connect-offsets'
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:100)
    at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:110)
    at org.apache.kafka.connect.runtime.Worker.start(Worker.java:144)
    at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:108)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:211)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
access topics: [Topic authorization failed.]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
    ... 11 common frames omitted
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not 
authorized to access topics: [Topic authorization failed.]
{code}
Kafka 2.0 uses TOPIC_AUTHORIZATION_FAILED(29) as a response code now in the 
[CreateTopicsResponse| 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]]
 class whereas it used CLUSTER_AUTHORIZATION_FAILED(31) in Kafka 1.1 [for 
example|[https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]]

  was:
Similar to issue https://issues.apache.org/jira/browse/KAFKA-6250 but now with 
a different exception.

Error is:
{code:java}
018-11-13 16:14:10,283 [DistributedHerder] ERROR o.a.k.c.r.d.DistributedHerder 
- Uncaught exception in herder work thread, exiting:
org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
create/find topic(s) 'connect-offsets'
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:100)
    at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:110)
    at org.apache.kafka.connect.runtime.Worker.start(Worker.java:144)
    at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:108)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:211)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
access topics: [Topic authorization failed.]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
    at 

[jira] [Created] (KAFKA-7633) Kafka Connect requires permission to create internal topics even if they exist

2018-11-15 Thread Arabelle Hou (JIRA)
Arabelle Hou created KAFKA-7633:
---

 Summary: Kafka Connect requires permission to create internal 
topics even if they exist
 Key: KAFKA-7633
 URL: https://issues.apache.org/jira/browse/KAFKA-7633
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Arabelle Hou


Similar to issue https://issues.apache.org/jira/browse/KAFKA-6250 but now with 
a different exception.

Error is:
{code:java}
018-11-13 16:14:10,283 [DistributedHerder] ERROR o.a.k.c.r.d.DistributedHerder 
- Uncaught exception in herder work thread, exiting:
org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
create/find topic(s) 'connect-offsets'
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:100)
    at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
    at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:110)
    at org.apache.kafka.connect.runtime.Worker.start(Worker.java:144)
    at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:108)
    at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:211)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
access topics: [Topic authorization failed.]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
    at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
    ... 11 common frames omitted
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not 
authorized to access topics: [Topic authorization failed.]
{code}
I think it is because Kafka 2.0 uses TOPIC_AUTHORIZATION_FAILED(29) as a 
response code now in the CreateTopicsResponse class 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]
 whereas it used CLUSTER_AUTHORIZATION_FAILED(31) in Kafka 1.1 for example 
[https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7633) Kafka Connect requires permission to create internal topics even if they exist

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688737#comment-16688737
 ] 

ASF GitHub Bot commented on KAFKA-7633:
---

arabelle opened a new pull request #5918: KAFKA-7633: Allow Kafka Connect to 
access internal topics without clu…
URL: https://github.com/apache/kafka/pull/5918
 
 
   …ster ACLs
   
   When Kafka Connect does not have cluster ACLs to create topics,
   it fails to even access its internal topics which already exist.
   This was originally fixed in KAFKA-6250 by ignoring the cluster
   authorization error, but now Kafka 2.0 returns a different response
   code that corresponds to a different error. Add a patch to ignore this
   new error as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect requires permission to create internal topics even if they exist
> --
>
> Key: KAFKA-7633
> URL: https://issues.apache.org/jira/browse/KAFKA-7633
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Arabelle Hou
>Priority: Major
>
> Similar to issue https://issues.apache.org/jira/browse/KAFKA-6250 but now 
> with a different exception.
> Error is:
> {code:java}
> 018-11-13 16:14:10,283 [DistributedHerder] ERROR 
> o.a.k.c.r.d.DistributedHerder - Uncaught exception in herder work thread, 
> exiting:
> org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
> create/find topic(s) 'connect-offsets'
>     at 
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
>     at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:100)
>     at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
>     at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:110)
>     at org.apache.kafka.connect.runtime.Worker.start(Worker.java:144)
>     at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:108)
>     at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:211)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to 
> access topics: [Topic authorization failed.]
>     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>     at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
>     at 
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
>     ... 11 common frames omitted
> Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not 
> authorized to access topics: [Topic authorization failed.]
> {code}
> I think it is because Kafka 2.0 uses TOPIC_AUTHORIZATION_FAILED(29) as a 
> response code now in the CreateTopicsResponse class 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]
>  whereas it used CLUSTER_AUTHORIZATION_FAILED(31) in Kafka 1.1 for example 
> [https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7608) A Kafka Streams DSL transform or process call should potentially trigger a repartition

2018-11-15 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688729#comment-16688729
 ] 

Andy Bryant commented on KAFKA-7608:


Hi Bill

That sounds like a better solution. I see your point about not assuming how the 
state store is used especially given these are lower level APIs. I like the 
idea of giving hints that a topic can be internally managed as this may be 
generally useful in other patterns as well.

As part of the KIP it would be worth calling out in the docs that a repartition 
is not triggered for these calls, and providing the above suggestion if they do 
indeed need a repartition.

Cheers

Andy 

> A Kafka Streams DSL transform or process call should potentially trigger a 
> repartition
> --
>
> Key: KAFKA-7608
> URL: https://issues.apache.org/jira/browse/KAFKA-7608
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Andy Bryant
>Priority: Major
>
> Currently in Kafka Streams, if any DSL operation occurs that may modify the 
> keys of the record stream, the stream is flagged for repartitioning. 
> Currently this flag is checked prior to a stream join or an aggregation and 
> if set the stream is piped through a transient repartition topic. This 
> ensures messages with the same key are always co-located in the same 
> partition and hence same stream task and state store.
> The same mechanism should be used to trigger repartitioning prior to stream 
> {{transform}}, {{transformValues}} and {{process}} calls that specify one or 
> more state stores.
> Currently without the forced repartitioning, for streams where the key has 
> been modified, there is no guarantee the same keys will be processed by the 
> same task which would be what you expect when using a state store. Given that 
> aggregations and joins already automatically make this guarantee it seems 
> inconsistent that {{transform}} and {{process}} do not provide the same 
> guarantees.
> To achieve the same guarantees currently, developers must manually pipe the 
> stream through a topic to force the repartitioning. This works, but is 
> sub-optimal since you don't get the handy optimisation where the repartition 
> topic contents is purged after use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-11-15 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688798#comment-16688798
 ] 

John Roesler commented on KAFKA-7443:
-

Hi [~linyli],

Wow, this seems like a pretty bad condition.

Since you already have a patch for it, would you like to open a pull request on 
[https://github.com/apache/kafka] ?

This would make it easier to review and comment on your proposed fix.

Thank you!

-John

> OffsetOutOfRangeException in restoring state store from changelog topic when 
> start offset of local checkpoint is smaller than that of changelog topic
> -
>
> Key: KAFKA-7443
> URL: https://issues.apache.org/jira/browse/KAFKA-7443
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: John Roesler
>Priority: Major
>  Labels: feather
>
> When restoring local state store from a changelog topic in EOS, kafka stream 
> will sometimes throw out the OffsetOutOfRangeException such as:
> {code:java}
> Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from 
> scratch.
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions: 
> {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734){code}
>  
> This scenario occurs when changelog topic deleted the expired log segments 
> according to the retention.ms, but the start offset in the local .checkpoint 
> file is the position when the task last exits from this instance, which may 
> be smaller than the updated beginning offset of changelog topic. Restoring 
> store from start offset in checkpoint file will throw exception.
> It can be reproduced as below (Kafka Stream runs in EOS):
>  # task for topic partition test-1 is running on instance A. When task exits, 
> kafka stream writes the last committed offset 100 for test-1 in checkpoint 
> file.
>  # task test-1 transfer to instance B.
>  # During this time, the remote changelog topic for test-1 updates its start 
> offset to 120 as the old log segment reaches retention time and is deleted.
>  # After a while, task test-1 exits from instance B and resumes on instance 
> A, and task restores local state store of A from checkpoint offset 100, which 
> is smaller than the valid offset 120 of changelog topic. Such exception 
> throws out.
> When this exception occurs, kafka stream tries to reinitialize the task and 
> intends to restore from beginning in catch block below. Unfortunately, this 
> handle not work and the task keeps throwing  OffsetOutOfRangeException in the 
> following restoring processes.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> //handle for OffsetOutOfRangeException in kafka stream
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate from scratch.", recoverableException);
>  final Set partitions = recoverableException.partitions();
>  for (final TopicPartition partition : partitions) {
>final StreamTask task = active.restoringTaskFor(partition);
>log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
>needsInitializing.remove(partition);
>needsRestoring.remove(partition);
>
> task.reinitializeStateStoresForPartitions(recoverableException.partitions());
>  }
>  restoreConsumer.seekToBeginning(partitions);
> }{code}
>  
>  Investigate why the handle for this exception not work, I found the root 
> cause:
>  Kafka stream registered state restorers in the variable stateRestorers, 
> which is used to read /update the start and end offset for 

[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-15 Thread Vik Gamov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688927#comment-16688927
 ] 

Vik Gamov commented on KAFKA-7595:
--

[~mjsax] [~vvcephei] 

Thank you for input. I was able to rewrite my app according to Matthias 
recommendations

[https://github.com/gAmUssA/streams-movie-demo/blob/refactoring/1/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L165]
 

 

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start/close fetchers

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1666#comment-1666
 ] 

ASF GitHub Bot commented on KAFKA-7576:
---

hachikuji closed pull request #5875: KAFKA-7576: Fix shutdown of replica 
fetcher threads
URL: https://github.com/apache/kafka/pull/5875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 448932e358b..18a7eefe202 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -197,4 +197,20 @@ ClientRequest newClientRequest(String nodeId,
int requestTimeoutMs,
RequestCompletionHandler callback);
 
+
+
+/**
+ * Initiates shutdown of this client. This method may be invoked from 
another thread while this
+ * client is being polled. No further requests may be sent using the 
client. The current poll()
+ * will be terminated using wakeup(). The client should be explicitly 
shutdown using {@link #close()}
+ * after poll returns. Note that {@link #close()} should not be invoked 
concurrently while polling.
+ */
+void initiateClose();
+
+/**
+ * Returns true if the client is still active. Returns false if {@link 
#initiateClose()} or {@link #close()}
+ * was invoked for this client.
+ */
+boolean active();
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 902ef1c3fda..144987e8494 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelState;
@@ -54,6 +55,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -64,6 +66,12 @@
  */
 public class NetworkClient implements KafkaClient {
 
+private enum State {
+ACTIVE,
+CLOSING,
+CLOSED
+}
+
 private final Logger log;
 
 /* the selector used to perform network i/o */
@@ -114,6 +122,8 @@
 
 private final Sensor throttleTimeSensor;
 
+private final AtomicReference state;
+
 public NetworkClient(Selectable selector,
  Metadata metadata,
  String clientId,
@@ -254,6 +264,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
 this.throttleTimeSensor = throttleTimeSensor;
 this.log = logContext.logger(NetworkClient.class);
 this.clientDnsLookup = clientDnsLookup;
+this.state = new AtomicReference<>(State.ACTIVE);
 }
 
 /**
@@ -429,6 +440,7 @@ private void 
sendInternalMetadataRequest(MetadataRequest.Builder builder,
 }
 
 private void doSend(ClientRequest clientRequest, boolean 
isInternalRequest, long now) {
+ensureActive();
 String nodeId = clientRequest.destination();
 if (!isInternalRequest) {
 // If this request came from outside the NetworkClient, validate
@@ -507,6 +519,8 @@ private void doSend(ClientRequest clientRequest, boolean 
isInternalRequest, long
  */
 @Override
 public List poll(long timeout, long now) {
+ensureActive();
+
 if (!abortedSends.isEmpty()) {
 // If there are aborted sends because of unsupported version 
exceptions or disconnects,
 // handle them immediately without waiting for Selector#poll.
@@ -586,13 +600,35 @@ public void wakeup() {
 this.selector.wakeup();
 }
 
+@Override
+public void initiateClose() {
+if (state.compareAndSet(State.ACTIVE, State.CLOSING)) {
+wakeup();
+}
+}
+
+@Override
+public boolean active() {
+return state.get() == State.ACTIVE;
+}
+
+private void ensureActive() {
+if (!active())
+throw new DisconnectException("NetworkClient is no longer active, 
state is " + state);
+}
+
 /**
  * Close the network client
  */
 @Override
 public void close() {
-

[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689059#comment-16689059
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang closed pull request #5915: KAFKA-7192: Wipe out state store if EOS 
is turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5915
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34350c17eb0..c03de2d4a2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -59,9 +59,14 @@ public StoreChangelogReader(final Consumer 
consumer,
 
 @Override
 public void register(final StateRestorer restorer) {
-restorer.setUserRestoreListener(userStateRestoreListener);
-stateRestorers.put(restorer.partition(), restorer);
-needsInitializing.put(restorer.partition(), restorer);
+final StateRestorer existingRestorer = 
stateRestorers.get(restorer.partition());
+if (existingRestorer == null) {
+restorer.setUserRestoreListener(userStateRestoreListener);
+stateRestorers.put(restorer.partition(), restorer);
+needsInitializing.put(restorer.partition(), restorer);
+} else {
+needsInitializing.put(restorer.partition(), existingRestorer);
+}
 }
 
 /**
@@ -188,7 +193,6 @@ private void startRestoration(final Map initializ
 
restorer.setCheckpointOffset(consumer.position(restoringPartition));
 
 task.reinitializeStateStoresForPartitions(restoringPartition);
-stateRestorers.get(restoringPartition).restoreStarted();
 } else {
 log.info("Restoring task {}'s state store {} from beginning of 
the changelog {} ", task.id, restorer.storeName(), restoringPartition);
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-15 Thread Eugen Feller (JIRA)
Eugen Feller created KAFKA-7634:
---

 Summary: Punctuate not being called with merge() and/or outerJoin()
 Key: KAFKA-7634
 URL: https://issues.apache.org/jira/browse/KAFKA-7634
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.3
Reporter: Eugen Feller


Hi all,

I am using the Processor API and having trouble to get Kafka streams v0.11.0.3 
call the punctuate() function after a merge() and/or outerJoin(). Specifically, 
I am having a topology where I am doing flatMapValues() -> merge() and/or 
outerJoin -> transform(). If I dont call merge() and/or outerJoin() before 
transform(), punctuate is being called as expected.

Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2018-11-15 Thread Ozgur (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689085#comment-16689085
 ] 

Ozgur commented on KAFKA-7628:
--

Hi Guozhang,

I've upgraded my client version to the last version (2.0.1) but the problem was 
same. I'm thinking about this is more likely an application logic error other 
than Kafka's.

Thanks.

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7628) KafkaStream is not closing

2018-11-15 Thread Ozgur (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ozgur resolved KAFKA-7628.
--
Resolution: Fixed

This issue caused by the another component of the application. Processor 
implementation communicates with Ignite in a not thread-safe manner and blocks 
the same Kafka thread. Therefore Kafka's shutdown thread unable to finish 
within specified timeout.

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2018-11-15 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689078#comment-16689078
 ] 

Guozhang Wang commented on KAFKA-7628:
--

In old versions of Kafka Streams there may be some bugs about state transition 
that gets fixed. More specifically: `KafkaStreams.close(timeout)` triggers its 
stream thread to complete and join and then wait for the specified timeout. It 
returns `false` indicating that not all threads are joined within that timeout. 
But after that the shutting down may still be completed later (i.e. the state 
eventually transit to NOT_RUNNING). Could you try to upgrade to newer version 
and see if it issue still persists?

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2018-11-15 Thread Andras Beni (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687631#comment-16687631
 ] 

Andras Beni commented on KAFKA-7631:


[~asasvari], [~viktorsomogyi] you might want to look at this issue.

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> ---
>
> Key: KAFKA-7631
> URL: https://issues.apache.org/jira/browse/KAFKA-7631
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.0.0
>Reporter: Andras Beni
>Priority: Minor
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2018-11-15 Thread Andras Beni (JIRA)
Andras Beni created KAFKA-7631:
--

 Summary: NullPointerException when SCRAM is allowed bu 
ScramLoginModule is not in broker's jaas.conf
 Key: KAFKA-7631
 URL: https://issues.apache.org/jira/browse/KAFKA-7631
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 2.0.0
Reporter: Andras Beni


When user wants to use delegation tokens and lists {{SCRAM}} in 
{{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to broker's 
JAAS configuration, a null pointer exception is thrown on broker side and the 
connection is closed.

Meaningful error message should be logged and sent back to the client.
{code}
java.lang.NullPointerException
at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
at kafka.network.Processor.poll(SocketServer.scala:679)
at kafka.network.Processor.run(SocketServer.scala:584)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687874#comment-16687874
 ] 

Kobi Hikri commented on KAFKA-1194:
---

Hi guys,

Anyone from the core team, who want to help me in bringing this issue to a 
"fixed" state?
I am currently "reverse engineering" the code, in order to provide a fix.

Best regards,
Kobi.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Harald Kirsch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687984#comment-16687984
 ] 

Harald Kirsch commented on KAFKA-1194:
--

[~Kobi Hikri] please be sure to understand the likely underlying principle 
issue with the JVM/mmap/Windows by following the pointers in 
https://issues.apache.org/jira/browse/KAFKA-1194?focusedCommentId=16580774=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16580774

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Harald Kirsch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688007#comment-16688007
 ] 

Harald Kirsch commented on KAFKA-1194:
--

[~Kobi Hikri] Good you know about! :) But I am not in a position to help with 
the PR, sorry.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688008#comment-16688008
 ] 

M. Manna commented on KAFKA-1194:
-

Just out of curiosity,  have you considered dockerising your Kafka service,
or does it not work for your business? Or perhaps, moving to Linux server?




> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687995#comment-16687995
 ] 

Kobi Hikri commented on KAFKA-1194:
---

Thanks [~haraldk], I actually do understand the principle here :).
I've prepared a fix for the issue and have stress tested it on Windows.
However - I am unable to push the fix branch I've created in order to perform a 
PR.
Can anyone assist with this?

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)