[jira] [Updated] (KAFKA-14573) RoundRobinPartitioner doesn't work as expected with topic that has 6 partitions
[ https://issues.apache.org/jira/browse/KAFKA-14573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephane Maarek updated KAFKA-14573: Description: Create a topic with 6 partitions (that's how the behavior is observed) ``` kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create --partitions 6 ``` Start a consumer ``` kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning ``` Start a producer using `RoundRobinPartitioner` ``` kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic test_topic ``` And you will see only 3 partitions get produced to (not 6) This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected behavior). But with 6 it happens. Verified to be like this on my colleague's computer too. ``` CreateTime: 1672859052082 Partition: 5 null a CreateTime: 1672859053586 Partition: 1 null b CreateTime: 1672859054995 Partition: 3 null c CreateTime: 1672859056405 Partition: 5 null d CreateTime: 1672859057861 Partition: 1 null e CreateTime: 1672859059181 Partition: 3 null f ``` was: Create a topic with 6 partitions (that's how the behavior is observed) ``` kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create --partitions 6 ``` Start a consumer ``` kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning ``` Start a producer using `RoundRobinPartitioner` ``` kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic test_topic ``` And you will see only 3 partitions get produced to (not 6) This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected behavior). But with 6 it happens. Verified to be like this on my colleague's computer too. ``` CreateTime: 1672859052082 Partition: 5 null a CreateTime: 1672859053586 Partition: 1 null b CreateTime: 1672859054995 Partition: 3 null C CreateTime: 1672859056405 Partition: 5 null d CreateTime: 1672859057861 Partition: 1 null e CreateTime: 1672859059181 Partition: 3 null e ``` > RoundRobinPartitioner doesn't work as expected with topic that has 6 > partitions > --- > > Key: KAFKA-14573 > URL: https://issues.apache.org/jira/browse/KAFKA-14573 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.1 >Reporter: Stephane Maarek >Priority: Major > Attachments: image-2023-01-04-20-15-43-039.png > > > Create a topic with 6 partitions (that's how the behavior is observed) > ``` > kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create > --partitions 6 > ``` > Start a consumer > ``` > kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic > --formatter kafka.tools.DefaultMessageFormatter --property > print.timestamp=true --property print.key=true --property print.value=true > --property print.partition=true --from-beginning > ``` > Start a producer using `RoundRobinPartitioner` > ``` > kafka-console-producer.sh --bootstrap-server localhost:9092 > --producer-property > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner > --topic test_topic > ``` > And you will see only 3 partitions get produced to (not 6) > This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the > expected behavior). > But with 6 it happens. > Verified to be like this on my colleague's computer too. > ``` > CreateTime: 1672859052082 Partition: 5 null a > CreateTime: 1672859053586 Partition: 1 null b > CreateTime: 1672859054995 Partition: 3 null c > CreateTime: 1672859056405 Partition: 5 null d > CreateTime: 1672859057861 Partition: 1 null e > CreateTime: 1672859059181 Partition: 3 null f > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14573) RoundRobinPartitioner doesn't work as expected with topic that has 6 partitions
[ https://issues.apache.org/jira/browse/KAFKA-14573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephane Maarek updated KAFKA-14573: Description: Create a topic with 6 partitions (that's how the behavior is observed) ``` kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create --partitions 6 ``` Start a consumer ``` kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning ``` Start a producer using `RoundRobinPartitioner` ``` kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic test_topic ``` And you will see only 3 partitions get produced to (not 6) This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected behavior). But with 6 it happens. Verified to be like this on my colleague's computer too. ``` CreateTime: 1672859052082 Partition: 5 null a CreateTime: 1672859053586 Partition: 1 null b CreateTime: 1672859054995 Partition: 3 null C CreateTime: 1672859056405 Partition: 5 null d CreateTime: 1672859057861 Partition: 1 null e CreateTime: 1672859059181 Partition: 3 null e ``` was: Create a topic with 6 partitions (that's how the behavior is observed) ``` kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create --partitions 6 ``` Start a consumer ``` kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning ``` Start a producer using `RoundRobinPartitioner` ``` kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic test_topic ``` And you will see only 3 partitions get produced to (not 6) This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected behavior). But with 6 it happens. Verified to be like this on my colleague's computer too. !image-2023-01-04-20-15-43-039.png! > RoundRobinPartitioner doesn't work as expected with topic that has 6 > partitions > --- > > Key: KAFKA-14573 > URL: https://issues.apache.org/jira/browse/KAFKA-14573 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.1 >Reporter: Stephane Maarek >Priority: Major > Attachments: image-2023-01-04-20-15-43-039.png > > > Create a topic with 6 partitions (that's how the behavior is observed) > ``` > kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create > --partitions 6 > ``` > Start a consumer > ``` > kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic > --formatter kafka.tools.DefaultMessageFormatter --property > print.timestamp=true --property print.key=true --property print.value=true > --property print.partition=true --from-beginning > ``` > Start a producer using `RoundRobinPartitioner` > ``` > kafka-console-producer.sh --bootstrap-server localhost:9092 > --producer-property > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner > --topic test_topic > ``` > And you will see only 3 partitions get produced to (not 6) > This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the > expected behavior). > But with 6 it happens. > Verified to be like this on my colleague's computer too. > ``` > CreateTime: 1672859052082 Partition: 5 null a > CreateTime: 1672859053586 Partition: 1 null b > CreateTime: 1672859054995 Partition: 3 null C > CreateTime: 1672859056405 Partition: 5 null d > CreateTime: 1672859057861 Partition: 1 null e > CreateTime: 1672859059181 Partition: 3 null e > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14573) RoundRobinPartitioner doesn't work as expected with topic that has 6 partitions
Stephane Maarek created KAFKA-14573: --- Summary: RoundRobinPartitioner doesn't work as expected with topic that has 6 partitions Key: KAFKA-14573 URL: https://issues.apache.org/jira/browse/KAFKA-14573 Project: Kafka Issue Type: Bug Affects Versions: 3.3.1 Reporter: Stephane Maarek Attachments: image-2023-01-04-20-15-43-039.png Create a topic with 6 partitions (that's how the behavior is observed) ``` kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create --partitions 6 ``` Start a consumer ``` kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning ``` Start a producer using `RoundRobinPartitioner` ``` kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic test_topic ``` And you will see only 3 partitions get produced to (not 6) This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected behavior). But with 6 it happens. Verified to be like this on my colleague's computer too. !image-2023-01-04-20-15-43-039.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688044#comment-16688044 ] Stephane Maarek commented on KAFKA-1194: [~Kobi Hikri] excellent, I'll be happy to help out and test your PR. Here's the process: 1) go to github.com/apache/kafka 2) create a fork (top right corner). I'll assume your username is "user" 3) you now have a copy of the repo at github.com/user/kafka 4) add a remote to your git: git remote add myfork g...@gitlab.com:user/kafka.git 5) push your branch to your remote: git push -u myfork mybranch 6) go back to the web interface at github.com/apache/kafka 7) you should see a yellow bar to help you do a pull request. Else follow this guide: https://help.github.com/articles/creating-a-pull-request/ Hope that helps! Stephane > The kafka broker cannot delete the old log files after the configured time > -- > > Key: KAFKA-1194 > URL: https://issues.apache.org/jira/browse/KAFKA-1194 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 > Environment: window >Reporter: Tao Qin >Priority: Critical > Labels: features, patch, windows > Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, > Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, > kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png > > Original Estimate: 72h > Remaining Estimate: 72h > > We tested it in windows environment, and set the log.retention.hours to 24 > hours. > # The minimum age of a log file to be eligible for deletion > log.retention.hours=24 > After several days, the kafka broker still cannot delete the old log file. > And we get the following exceptions: > [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task > 'kafka-log-retention' (kafka.utils.KafkaScheduler) > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 1516723 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:76) > at kafka.log.Log.deleteOldSegments(Log.scala:418) > at > kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > at > scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:314) > at > kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > I think this error happens because kafka tries to rename the log file when it > is still opened. So we should close the file first before rename. > The index file uses a special data structure, the MappedByteBuffer. Javadoc > describes it as: > A mapped byte buffer and the file mapping that it represents remain valid > until the buffer itself is garbage-collected. > Fortunately, I find a forceUnmap function
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612019#comment-16612019 ] Stephane Maarek commented on KAFKA-1194: Thanks [~Kobi Hikri]! That's what I thought, it was a quick fix for the log cleaner, but it breaks the delete feature. At least the brokers don't crash anymore To be honest I won't pursue more fix because of the lack of time, but I really hope someone in the core team can pick this up. To me, it seems we have pinpointed through the PR all the points of potential failures for Windows. As for the quality of the fix, I would say very low, I honestly don't understand much to the Windows file system and the order of Kafka's internal for log cleaning and deletion. I'm really hoping this is enough research so that [~ijuma] or [~lindong] can probably determine a better fix. In a super ideal world, the function to fix is {code}Utils.atomicMoveWithFallback(src, dst);{code} > The kafka broker cannot delete the old log files after the configured time > -- > > Key: KAFKA-1194 > URL: https://issues.apache.org/jira/browse/KAFKA-1194 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 > Environment: window >Reporter: Tao Qin >Priority: Critical > Labels: features, patch, windows > Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, > Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, > kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png > > Original Estimate: 72h > Remaining Estimate: 72h > > We tested it in windows environment, and set the log.retention.hours to 24 > hours. > # The minimum age of a log file to be eligible for deletion > log.retention.hours=24 > After several days, the kafka broker still cannot delete the old log file. > And we get the following exceptions: > [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task > 'kafka-log-retention' (kafka.utils.KafkaScheduler) > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 1516723 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:76) > at kafka.log.Log.deleteOldSegments(Log.scala:418) > at > kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > at > scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:314) > at > kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > I think this error happens because kafka tries to rename the log file when it > is still opened. So we should close the file first before rename. > The index file uses a special data structure, the MappedByteBuffer. Javadoc > describes it as: > A mapped byte
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604408#comment-16604408 ] Stephane Maarek commented on KAFKA-1194: hi [~Kobi Hikri], can you please test one more time https://github.com/apache/kafka/pull/5603 ? I hadn't fixed the class that is giving you errors If you manage to trigger errors, can you please give me the steps to reproduce? BTW, you may see a lot of "ERROR" logs, that's intended, just so I can see the code path. Please past full log somewhere if you get another error, that'll help > The kafka broker cannot delete the old log files after the configured time > -- > > Key: KAFKA-1194 > URL: https://issues.apache.org/jira/browse/KAFKA-1194 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 > Environment: window >Reporter: Tao Qin >Priority: Critical > Labels: features, patch, windows > Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, > kafka-1194-v2.patch, screenshot-1.png > > Original Estimate: 72h > Remaining Estimate: 72h > > We tested it in windows environment, and set the log.retention.hours to 24 > hours. > # The minimum age of a log file to be eligible for deletion > log.retention.hours=24 > After several days, the kafka broker still cannot delete the old log file. > And we get the following exceptions: > [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task > 'kafka-log-retention' (kafka.utils.KafkaScheduler) > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 1516723 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:76) > at kafka.log.Log.deleteOldSegments(Log.scala:418) > at > kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > at > scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:314) > at > kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > I think this error happens because kafka tries to rename the log file when it > is still opened. So we should close the file first before rename. > The index file uses a special data structure, the MappedByteBuffer. Javadoc > describes it as: > A mapped byte buffer and the file mapping that it represents remain valid > until the buffer itself is garbage-collected. > Fortunately, I find a forceUnmap function in kafka code, and perhaps it can > be used to free the MappedByteBuffer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601613#comment-16601613 ] Stephane Maarek commented on KAFKA-1194: [~lindong] Not fixed as of {code} [2018-09-02 11:03:12,789] INFO Kafka version : 2.1.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser) [2018-09-02 11:03:12,789] INFO Kafka commitId : 7299e1836ba2 (org.apache.kafka.common.utils.AppInfoParser) {code} Here are the full steps to reproduce and trigger: {code} C:\kafka_2.11-2.1.0-SNAPSHOT>kafka-topics.bat --zookeeper 127.0.0.1:2181 --topic second_topic --create --partitions 3 --replication-factor 1 WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "second_topic". C:\kafka_2.11-2.1.0-SNAPSHOT>kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic second_topic >hello >world >hello >Terminate batch job (Y/N)? Y C:\kafka_2.11-2.1.0-SNAPSHOT>kafka-topics.bat --zookeeper 127.0.0.1:2181 --topic second_topic --delete Topic second_topic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. {code} Which triggers Kafka shutdown with: {code} [2018-09-02 11:04:15,460] ERROR Error while renaming dir for second_topic-1 in log dir C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka (kafka.server.LogDirFailureChannel) java.nio.file.AccessDeniedException: C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka\second_topic-1 -> C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka\second_topic-1.d1ceee24d7474152b6fedd61903449e5-delete at sun.nio.fs.WindowsException.translateToIOException(Unknown Source) at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) at sun.nio.fs.WindowsFileCopy.move(Unknown Source) at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source) at java.nio.file.Files.move(Unknown Source) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:809) at kafka.log.Log$$anonfun$renameDir$1.apply$mcV$sp(Log.scala:689) at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:687) at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:687) at kafka.log.Log.maybeHandleIOException(Log.scala:1842) at kafka.log.Log.renameDir(Log.scala:687) at kafka.log.LogManager.asyncDelete(LogManager.scala:833) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:271) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:265) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at kafka.cluster.Partition.delete(Partition.scala:265) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:340) at kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:370) at kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:368) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:368) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:200) at kafka.server.KafkaApis.handle(KafkaApis.scala:111) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Unknown Source) Suppressed: java.nio.file.AccessDeniedException: C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka\second_topic-1 -> C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka\second_topic-1.d1ceee24d7474152b6fedd61903449e5-delete at sun.nio.fs.WindowsException.translateToIOException(Unknown Source) at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) at sun.nio.fs.WindowsFileCopy.move(Unknown Source) at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source) at java.nio.file.Files.move(Unknown Source) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:806) ... 23 more [2018-09-02 11:04:15,460] INFO [ReplicaManager broker=0] Stopping serving replicas in dir C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka (kafka.server.ReplicaManager) {code} > The kafka broker cannot delete the old log files after the configured time > -- > > Key: KAFKA-1194 > URL: https://issues.apache.org/jira/browse/KAFKA-1194 > Project: Kafka > Issue Type: Bug > Components: log >Affects
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601552#comment-16601552 ] Stephane Maarek commented on KAFKA-7278: [~lindong] Unfortunately, this does not fix https://issues.apache.org/jira/browse/KAFKA-1194 I'll post the details there... > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7355) Topic Configuration Changes are not applied until reboot
Stephane Maarek created KAFKA-7355: -- Summary: Topic Configuration Changes are not applied until reboot Key: KAFKA-7355 URL: https://issues.apache.org/jira/browse/KAFKA-7355 Project: Kafka Issue Type: Bug Components: config, core Affects Versions: 2.0.0 Reporter: Stephane Maarek Steps to reproduce: {code} kafka-topics --zookeeper 127.0.0.1:2181 --create --topic employee-salary --partitions 1 --replication-factor 1 kafka-configs --zookeeper 127.0.0.1:2181 --alter --entity-type topics --entity-name employee-salary --add-config cleanup.policy=compact,min.cleanable.dirty.ratio=0.001,segment.ms=5000 kafka-configs --zookeeper 127.0.0.1:2181 --alter --entity-type topics --entity-name employee-salary kafka-console-producer --broker-list 127.0.0.1:9092 --topic employee-salary --property parse.key=true --property key.separator=, {code} Try publishing a bunch of data, and no segment roll over will happen (even though segment.ms=5000). I looked at the kafka directory and the kafka logs to ensure I noticed the broker processed the notification of config changes, but the behaviour was not updated to use the new config values nonetheless. After restarting the broker, the expected behaviour is observed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529262#comment-16529262 ] Stephane Maarek commented on KAFKA-7077: Related but not the same. KAFKA-6340 requires some profound changes to how data is produced and consumed This just enables idempotence, which doesn't change consumers behaviour, and should increase performance and guarantees. I'd love to hear your input on the KIP mailing thread: https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7077) producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true)
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephane Maarek updated KAFKA-7077: --- Description: KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true) > --- > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent
[ https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephane Maarek updated KAFKA-7077: --- Summary: KIP-318: Make Kafka Connect Source idempotent (was: producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true)) > KIP-318: Make Kafka Connect Source idempotent > - > > Key: KAFKA-7077 > URL: https://issues.apache.org/jira/browse/KAFKA-7077 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > > KIP Link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7077) producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true)
Stephane Maarek created KAFKA-7077: -- Summary: producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true) Key: KAFKA-7077 URL: https://issues.apache.org/jira/browse/KAFKA-7077 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.0.0 Reporter: Stephane Maarek Assignee: Stephane Maarek -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7066) Make Streams Runtime Error User Friendly in Case of Serialisation exception
[ https://issues.apache.org/jira/browse/KAFKA-7066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16514953#comment-16514953 ] Stephane Maarek commented on KAFKA-7066: Thanks [~mjsax] . I think that helps, but my PR looks like it goes at the most common - lowest level for all these issues, which addresses all kinds of stores. With logging though, I'd rather have too much than less, so I don't think any issues supersedes others > Make Streams Runtime Error User Friendly in Case of Serialisation exception > --- > > Key: KAFKA-7066 > URL: https://issues.apache.org/jira/browse/KAFKA-7066 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > Fix For: 2.0.0 > > > This kind of exception can be cryptic for the beginner: > {code:java} > ERROR stream-thread > [favourite-colors-application-a336770d-6ba6-4bbb-8681-3c8ea91bd12e-StreamThread-1] > Failed to process stream task 2_0 due to the following error: > (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105) > java.lang.ClassCastException: java.lang.Long cannot be cast to > java.lang.String > at > org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) > at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at > org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95) > at > org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} > We should add more detailed logging already present in SinkNode to assist the > user into solving this issue -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7066) Make Streams Runtime Error User Friendly in Case of Serialisation exception
[ https://issues.apache.org/jira/browse/KAFKA-7066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephane Maarek reassigned KAFKA-7066: -- Assignee: Stephane Maarek > Make Streams Runtime Error User Friendly in Case of Serialisation exception > --- > > Key: KAFKA-7066 > URL: https://issues.apache.org/jira/browse/KAFKA-7066 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Stephane Maarek >Assignee: Stephane Maarek >Priority: Major > Fix For: 2.0.0 > > > This kind of exception can be cryptic for the beginner: > {code:java} > ERROR stream-thread > [favourite-colors-application-a336770d-6ba6-4bbb-8681-3c8ea91bd12e-StreamThread-1] > Failed to process stream task 2_0 due to the following error: > (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105) > java.lang.ClassCastException: java.lang.Long cannot be cast to > java.lang.String > at > org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) > at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at > org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95) > at > org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} > We should add more detailed logging already present in SinkNode to assist the > user into solving this issue -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7066) Make Streams Runtime Error User Friendly in Case of Serialisation exception
Stephane Maarek created KAFKA-7066: -- Summary: Make Streams Runtime Error User Friendly in Case of Serialisation exception Key: KAFKA-7066 URL: https://issues.apache.org/jira/browse/KAFKA-7066 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.1.0 Reporter: Stephane Maarek Fix For: 2.0.0 This kind of exception can be cryptic for the beginner: {code:java} ERROR stream-thread [favourite-colors-application-a336770d-6ba6-4bbb-8681-3c8ea91bd12e-StreamThread-1] Failed to process stream task 2_0 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105) java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95) at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224) at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411) at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} We should add more detailed logging already present in SinkNode to assist the user into solving this issue -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6323) punctuate with WALL_CLOCK_TIME triggered immediately
[ https://issues.apache.org/jira/browse/KAFKA-6323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284366#comment-16284366 ] Stephane Maarek commented on KAFKA-6323: Fully agree [~guozhang] . I fully agree on the punctuate once as well (even if T2 is 5 intervals away), I have observed punctuate being called way to many times if the data does a big jump. Finally, is there any interest or use cases in using both a wall clock and event driven punctuate? Might require a KIP for that one > punctuate with WALL_CLOCK_TIME triggered immediately > > > Key: KAFKA-6323 > URL: https://issues.apache.org/jira/browse/KAFKA-6323 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Frederic Arno >Assignee: Frederic Arno > Fix For: 1.1.0, 1.0.1 > > > When working on a custom Processor from which I am scheduling a punctuation > using WALL_CLOCK_TIME. I've noticed that whatever the punctuation interval I > set, a call to my Punctuator is always triggered immediately. > Having a quick look at kafka-streams' code, I could find that all > PunctuationSchedule's timestamps are matched against the current time in > order to decide whether or not to trigger the punctuator > (org.apache.kafka.streams.processor.internals.PunctuationQueue#mayPunctuate). > However, I've only seen code that initializes PunctuationSchedule's timestamp > to 0, which I guess is what is causing an immediate punctuation. > At least when using WALL_CLOCK_TIME, shouldn't the PunctuationSchedule's > timestamp be initialized to current time + interval? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6092) Time passed in punctuate call is currentTime, not punctuate schedule time.
Stephane Maarek created KAFKA-6092: -- Summary: Time passed in punctuate call is currentTime, not punctuate schedule time. Key: KAFKA-6092 URL: https://issues.apache.org/jira/browse/KAFKA-6092 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.0 Reporter: Stephane Maarek The java doc specifies that for a Transformer, calling context.schedule calls punctuate every 1000ms. This is not entirely accurate, as if no data is received for a while, punctuate won't be called. {code} * void init(ProcessorContext context) { * this.context = context; * this.state = context.getStateStore("myTransformState"); * context.schedule(1000); // call #punctuate() each 1000ms * } {code} When you receive new data say after 20 seconds, punctuate will play catch up and will be called 20 times at reception of the new data. the signature of punctuate is {code} * KeyValue punctuate(long timestamp) { * // can access this.state * // can emit as many new KeyValue pairs as required via this.context#forward() * return null; // don't return result -- can also be "new KeyValue()" * } {code} but the timestamp being passed is currentTimestamp at the time of the call to punctuate, not at the time the punctuate was scheduled. It is very confusing and I think the timestamp should represent the one at which the punctuate should have been scheduled. Getting the current timestamp is not adding much information as it can easily obtained using System.currentTimeMillis(); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path
[ https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199387#comment-16199387 ] Stephane Maarek commented on KAFKA-6007: I agree. You are correct in your assessment. Overall this seems like a user error, and the error message is erroneous. It leads to believe there's a problem with connect itself whereas it's a misusage of the framework. I think if you can change the code so that the error messages are more explicit and context accurate, this would give enough feedback to the user to fix the issues and place the connectors where appropriate. This kind of error message is cool: “we loaded this connector from the plugins, and your transformer is not in the plugins” “we loaded this connector from the cp, and your transformer is not in the cp” > Connect can't validate against transforms in plugins.path > - > > Key: KAFKA-6007 > URL: https://issues.apache.org/jira/browse/KAFKA-6007 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Stephane Maarek >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 0.11.0.1, 1.0.0 > > > Kafka Connect can't validate a custom transformation if placed in plugins > path. > Here's the output I get on the validate call: > {code:java} > Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for > configuration transforms.Flat.type: Class > com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found. > Invalid value null for configuration transforms.Flat.type: Not a > Transformation > "recommended_values": [ > "com.mycorp.kafka.transforms.Flatten$Key", > "com.mycorp.kafka.transforms.Flatten$Value", > "com.mycorp.kafka.transforms.impl.FlattenSinkRecord", > "org.apache.kafka.connect.transforms.Cast$Key", > "org.apache.kafka.connect.transforms.Cast$Value", > "org.apache.kafka.connect.transforms.ExtractField$Key", > "org.apache.kafka.connect.transforms.ExtractField$Value", > "org.apache.kafka.connect.transforms.Flatten$Key", > "org.apache.kafka.connect.transforms.Flatten$Value", > "org.apache.kafka.connect.transforms.HoistField$Key", > "org.apache.kafka.connect.transforms.HoistField$Value", > "org.apache.kafka.connect.transforms.InsertField$Key", > "org.apache.kafka.connect.transforms.InsertField$Value", > "org.apache.kafka.connect.transforms.MaskField$Key", > "org.apache.kafka.connect.transforms.MaskField$Value", > "org.apache.kafka.connect.transforms.RegexRouter", > "org.apache.kafka.connect.transforms.ReplaceField$Key", > "org.apache.kafka.connect.transforms.ReplaceField$Value", > "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key", > "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", > "org.apache.kafka.connect.transforms.TimestampConverter$Key", > "org.apache.kafka.connect.transforms.TimestampConverter$Value", > "org.apache.kafka.connect.transforms.TimestampRouter", > "org.apache.kafka.connect.transforms.ValueToKey"], > {code} > As you can see the class appear in the recommended values (!) but can't be > picked up on the validate call. > I believe it's because the recommender implements class discovery using > plugins: > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194 > But the class inference itself doesn't: > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199 > (I'm not an expert in class loading though, just a guess... Unsure how to fix) > A quick fix is to add the transformations in the ClassPath itself, but that > defeats the point a bit. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5993) Kafka Admin Client throws a warning for sasl.jaas.config
[ https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephane Maarek updated KAFKA-5993: --- Summary: Kafka Admin Client throws a warning for sasl.jaas.config (was: Kafka AdminClient does not support standard security settings) > Kafka Admin Client throws a warning for sasl.jaas.config > > > Key: KAFKA-5993 > URL: https://issues.apache.org/jira/browse/KAFKA-5993 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Stephane Maarek > > Kafka Admin Client does not support basic security configurations, such as > "sasl.jaas.config". > Therefore it makes it impossible to use against a secure cluster > ``` > 14:12:12.948 [main] WARN org.apache.kafka.clients.admin.AdminClientConfig - > The configuration 'sasl.jaas.config' was supplied but isn't a known config. > ``` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5993) Kafka AdminClient does not support standard security settings
[ https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16190808#comment-16190808 ] Stephane Maarek edited comment on KAFKA-5993 at 10/4/17 4:52 AM: - [~ijuma] Indeed it does support security settings. The log still shows that WARN, which is what was misleading on my end. See full log here (it's using the PLAINTEXT protocol on purpose for now): {code:java} 15:42:18.548 [main] INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values: bootstrap.servers = [localhost:9092] client.id = connections.max.idle.ms = 30 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 12 retries = 5 retry.backoff.ms = 100 sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS 15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed: 15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created: 15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received: 15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent: 15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received: 15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time: 15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time: 15:42:18.549 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)], partitions = []) 15:42:18.550 [main] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'sasl.jaas.config' was supplied but isn't a known config. 15:42:18.550 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.1 15:42:18.550 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : c2a0d5f9b1f45bf5 {code} My code to generate my admin client is: {code:java} public static AdminClient getAdminClient() { Properties adminProps = new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Optional.ofNullable(System.getenv("KAFKA_BOOTSTRAP_SERVERS")).orElse("localhost:9092")); adminProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, Optional.ofNullable(System.getenv("SECURITY_PROTOCOL")).orElse("PLAINTEXT")); adminProps.put(SaslConfigs.SASL_JAAS_CONFIG, Optional.ofNullable(System.getenv("SASL_JAAS_CONFIG")).orElse("")); return AdminClient.create(adminProps); } {code} If you feel that WARN is okay, we can close the JIRA, otherwise maybe we should rename the JIRA and address the WARN? was (Author: stephane.maa...@gmail.com): [~ijuma] Indeed it does support security settings. The log still shows that WARN, which is what was misleading on my end. See full log here: {code:java} 15:42:18.548 [main] INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values: bootstrap.servers = [localhost:9092] client.id = connections.max.idle.ms = 30 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 12 retries = 5 retry.backoff.ms = 100 sasl.jaas.config =
[jira] [Commented] (KAFKA-5993) Kafka AdminClient does not support standard security settings
[ https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16190808#comment-16190808 ] Stephane Maarek commented on KAFKA-5993: [~ijuma] Indeed it does support security settings. The log still shows that WARN, which is what was misleading on my end. See full log here: {code:java} 15:42:18.548 [main] INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values: bootstrap.servers = [localhost:9092] client.id = connections.max.idle.ms = 30 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 12 retries = 5 retry.backoff.ms = 100 sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS 15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed: 15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created: 15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received: 15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent: 15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received: 15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time: 15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time: 15:42:18.549 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)], partitions = []) 15:42:18.550 [main] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'sasl.jaas.config' was supplied but isn't a known config. 15:42:18.550 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.1 15:42:18.550 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : c2a0d5f9b1f45bf5 {code} My code to generate my admin client is: {code:java} public static AdminClient getAdminClient() { Properties adminProps = new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Optional.ofNullable(System.getenv("KAFKA_BOOTSTRAP_SERVERS")).orElse("localhost:9092")); adminProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, Optional.ofNullable(System.getenv("SECURITY_PROTOCOL")).orElse("PLAINTEXT")); adminProps.put(SaslConfigs.SASL_JAAS_CONFIG, Optional.ofNullable(System.getenv("SASL_JAAS_CONFIG")).orElse("")); return AdminClient.create(adminProps); } {code} If you feel that WARN is okay, we can close the JIRA, otherwise maybe we should rename the JIRA and address the WARN? > Kafka AdminClient does not support standard security settings > - > > Key: KAFKA-5993 > URL: https://issues.apache.org/jira/browse/KAFKA-5993 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Stephane Maarek > > Kafka Admin Client does not support basic security configurations, such as > "sasl.jaas.config". > Therefore it makes it impossible to use against a secure cluster > ``` > 14:12:12.948 [main] WARN org.apache.kafka.clients.admin.AdminClientConfig - > The configuration 'sasl.jaas.config' was supplied but isn't a known config. > ``` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6007) Connect can't validate against transforms in plugins.path
Stephane Maarek created KAFKA-6007: -- Summary: Connect can't validate against transforms in plugins.path Key: KAFKA-6007 URL: https://issues.apache.org/jira/browse/KAFKA-6007 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.1 Reporter: Stephane Maarek Kafka Connect can't validate a custom transformation if placed in plugins path. Here's the output I get on the validate call: {code:java} Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for configuration transforms.Flat.type: Class com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found. Invalid value null for configuration transforms.Flat.type: Not a Transformation "recommended_values": [ "com.mycorp.kafka.transforms.Flatten$Key", "com.mycorp.kafka.transforms.Flatten$Value", "com.mycorp.kafka.transforms.impl.FlattenSinkRecord", "org.apache.kafka.connect.transforms.Cast$Key", "org.apache.kafka.connect.transforms.Cast$Value", "org.apache.kafka.connect.transforms.ExtractField$Key", "org.apache.kafka.connect.transforms.ExtractField$Value", "org.apache.kafka.connect.transforms.Flatten$Key", "org.apache.kafka.connect.transforms.Flatten$Value", "org.apache.kafka.connect.transforms.HoistField$Key", "org.apache.kafka.connect.transforms.HoistField$Value", "org.apache.kafka.connect.transforms.InsertField$Key", "org.apache.kafka.connect.transforms.InsertField$Value", "org.apache.kafka.connect.transforms.MaskField$Key", "org.apache.kafka.connect.transforms.MaskField$Value", "org.apache.kafka.connect.transforms.RegexRouter", "org.apache.kafka.connect.transforms.ReplaceField$Key", "org.apache.kafka.connect.transforms.ReplaceField$Value", "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key", "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", "org.apache.kafka.connect.transforms.TimestampConverter$Key", "org.apache.kafka.connect.transforms.TimestampConverter$Value", "org.apache.kafka.connect.transforms.TimestampRouter", "org.apache.kafka.connect.transforms.ValueToKey"], {code} As you can see the class appear in the recommended values (!) but can't be picked up on the validate call. I believe it's because the recommender implements class discovery using plugins: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194 But the class inference itself doesn't: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199 (I'm not an expert in class loading though, just a guess... Unsure how to fix) A quick fix is to add the transformations in the ClassPath itself, but that defeats the point a bit. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5993) Kafka AdminClient does not support standard security settings
[ https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185455#comment-16185455 ] Stephane Maarek commented on KAFKA-5993: Okay thanks indeed that's odd. Let me confirm. Thanks Ismael! > Kafka AdminClient does not support standard security settings > - > > Key: KAFKA-5993 > URL: https://issues.apache.org/jira/browse/KAFKA-5993 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Stephane Maarek > > Kafka Admin Client does not support basic security configurations, such as > "sasl.jaas.config". > Therefore it makes it impossible to use against a secure cluster > ``` > 14:12:12.948 [main] WARN org.apache.kafka.clients.admin.AdminClientConfig - > The configuration 'sasl.jaas.config' was supplied but isn't a known config. > ``` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5993) Kafka AdminClient does not support standard security settings
[ https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185445#comment-16185445 ] Stephane Maarek commented on KAFKA-5993: I'm not on my laptop but I had created an admin client and just added a property. Can send a code sample next week but it's easy to reproduce > Kafka AdminClient does not support standard security settings > - > > Key: KAFKA-5993 > URL: https://issues.apache.org/jira/browse/KAFKA-5993 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Stephane Maarek > > Kafka Admin Client does not support basic security configurations, such as > "sasl.jaas.config". > Therefore it makes it impossible to use against a secure cluster > ``` > 14:12:12.948 [main] WARN org.apache.kafka.clients.admin.AdminClientConfig - > The configuration 'sasl.jaas.config' was supplied but isn't a known config. > ``` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5993) Kafka AdminClient does not support standard security settings
Stephane Maarek created KAFKA-5993: -- Summary: Kafka AdminClient does not support standard security settings Key: KAFKA-5993 URL: https://issues.apache.org/jira/browse/KAFKA-5993 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.1 Reporter: Stephane Maarek Kafka Admin Client does not support basic security configurations, such as "sasl.jaas.config". Therefore it makes it impossible to use against a secure cluster ``` 14:12:12.948 [main] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'sasl.jaas.config' was supplied but isn't a known config. ``` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5992) Better Java Documentation for AdminClient Exceptions
Stephane Maarek created KAFKA-5992: -- Summary: Better Java Documentation for AdminClient Exceptions Key: KAFKA-5992 URL: https://issues.apache.org/jira/browse/KAFKA-5992 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.1 Reporter: Stephane Maarek When invoking a describeTopics operation on a topic that does not exist, we get an InvalidTopicException as a RuntimeException. I believe this should be documented, and the API maybe changed: For example changing: {code:java} public DescribeTopicsResult describeTopics(Collection topicNames) { {code} To: {code:java} public DescribeTopicsResult describeTopics(Collection topicNames) throws InvalidTopicException {code} Additionally, in case multiple topics don't exist, only the first one will throw an error. This is really not scalable. Maybe the DescribeTopicsResult could have a Boolean "topicExists" ? Up for discussion -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5851) CPU overhead to having partitions (even inactive)
Stephane Maarek created KAFKA-5851: -- Summary: CPU overhead to having partitions (even inactive) Key: KAFKA-5851 URL: https://issues.apache.org/jira/browse/KAFKA-5851 Project: Kafka Issue Type: Bug Environment: 0.11.0.0, 0.10.x Reporter: Stephane Maarek We're running on three r4.xlarge, and this is our dev environment. Some people managed to create 1200 topics with 3 partitions, so we end up at 4000 partitions per broker (we have a replication factor of 3). Even though no data goes through the cluster (it sits at a comfortable 20 messages per seconds), we saw CPU at 100% (out of 400%). I went ahead today and deleted 700 topics that I knew were unused. And CPU went drastically down. See image !https://i.imgur.com/OIPTwDM.png! We use the defaults for our brokers, and they use PLAINTEXT internally to replicate. I'm not sure of the root cause of this, threads, replication, log cleanup, etc, and I guess it wouldn't be too hard to replicate (just create 1000 topics on a vanilla cluster and see CPU go up). Hope that helps -- This message was sent by Atlassian JIRA (v6.4.14#64029)