[jira] [Updated] (KAFKA-14573) RoundRobinPartitioner doesn't work as expected with topic that has 6 partitions

2023-01-04 Thread Stephane Maarek (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephane Maarek updated KAFKA-14573:

Description: 
Create a topic with 6 partitions (that's how the behavior is observed)
```
kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create 
--partitions 6
```

Start a consumer
```
kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic 
--formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true 
--property print.key=true --property print.value=true --property 
print.partition=true --from-beginning
```

Start a producer using `RoundRobinPartitioner`
```
kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property 
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 
--topic test_topic
```

And you will see only 3 partitions get produced to (not 6)

This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected 
behavior).

But with 6 it happens.

Verified to be like this on my colleague's computer too. 

```
CreateTime: 1672859052082 Partition: 5 null a
CreateTime: 1672859053586 Partition: 1 null b
CreateTime: 1672859054995 Partition: 3 null c
CreateTime: 1672859056405 Partition: 5 null d
CreateTime: 1672859057861 Partition: 1 null e
CreateTime: 1672859059181 Partition: 3 null f
```

  was:
Create a topic with 6 partitions (that's how the behavior is observed)
```
kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create 
--partitions 6
```

Start a consumer
```
kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic 
--formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true 
--property print.key=true --property print.value=true --property 
print.partition=true --from-beginning
```

Start a producer using `RoundRobinPartitioner`
```
kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property 
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 
--topic test_topic
```

And you will see only 3 partitions get produced to (not 6)

This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected 
behavior).

But with 6 it happens.

Verified to be like this on my colleague's computer too. 

```
CreateTime: 1672859052082 Partition: 5 null a
CreateTime: 1672859053586 Partition: 1 null b
CreateTime: 1672859054995 Partition: 3 null C
CreateTime: 1672859056405 Partition: 5 null d
CreateTime: 1672859057861 Partition: 1 null e
CreateTime: 1672859059181 Partition: 3 null e
```


> RoundRobinPartitioner doesn't work as expected with topic that has 6 
> partitions
> ---
>
> Key: KAFKA-14573
> URL: https://issues.apache.org/jira/browse/KAFKA-14573
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.1
>Reporter: Stephane Maarek
>Priority: Major
> Attachments: image-2023-01-04-20-15-43-039.png
>
>
> Create a topic with 6 partitions (that's how the behavior is observed)
> ```
> kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create 
> --partitions 6
> ```
> Start a consumer
> ```
> kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic 
> --formatter kafka.tools.DefaultMessageFormatter --property 
> print.timestamp=true --property print.key=true --property print.value=true 
> --property print.partition=true --from-beginning
> ```
> Start a producer using `RoundRobinPartitioner`
> ```
> kafka-console-producer.sh --bootstrap-server localhost:9092 
> --producer-property 
> partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 
> --topic test_topic
> ```
> And you will see only 3 partitions get produced to (not 6)
> This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the 
> expected behavior).
> But with 6 it happens.
> Verified to be like this on my colleague's computer too. 
> ```
> CreateTime: 1672859052082 Partition: 5 null a
> CreateTime: 1672859053586 Partition: 1 null b
> CreateTime: 1672859054995 Partition: 3 null c
> CreateTime: 1672859056405 Partition: 5 null d
> CreateTime: 1672859057861 Partition: 1 null e
> CreateTime: 1672859059181 Partition: 3 null f
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14573) RoundRobinPartitioner doesn't work as expected with topic that has 6 partitions

2023-01-04 Thread Stephane Maarek (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephane Maarek updated KAFKA-14573:

Description: 
Create a topic with 6 partitions (that's how the behavior is observed)
```
kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create 
--partitions 6
```

Start a consumer
```
kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic 
--formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true 
--property print.key=true --property print.value=true --property 
print.partition=true --from-beginning
```

Start a producer using `RoundRobinPartitioner`
```
kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property 
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 
--topic test_topic
```

And you will see only 3 partitions get produced to (not 6)

This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected 
behavior).

But with 6 it happens.

Verified to be like this on my colleague's computer too. 

```
CreateTime: 1672859052082 Partition: 5 null a
CreateTime: 1672859053586 Partition: 1 null b
CreateTime: 1672859054995 Partition: 3 null C
CreateTime: 1672859056405 Partition: 5 null d
CreateTime: 1672859057861 Partition: 1 null e
CreateTime: 1672859059181 Partition: 3 null e
```

  was:
Create a topic with 6 partitions (that's how the behavior is observed)
```
kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create 
--partitions 6
```

Start a consumer
```
kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic 
--formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true 
--property print.key=true --property print.value=true --property 
print.partition=true --from-beginning
```

Start a producer using `RoundRobinPartitioner`
```
kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property 
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 
--topic test_topic
```

And you will see only 3 partitions get produced to (not 6)

This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected 
behavior).

But with 6 it happens.

Verified to be like this on my colleague's computer too. 

 !image-2023-01-04-20-15-43-039.png! 


> RoundRobinPartitioner doesn't work as expected with topic that has 6 
> partitions
> ---
>
> Key: KAFKA-14573
> URL: https://issues.apache.org/jira/browse/KAFKA-14573
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.1
>Reporter: Stephane Maarek
>Priority: Major
> Attachments: image-2023-01-04-20-15-43-039.png
>
>
> Create a topic with 6 partitions (that's how the behavior is observed)
> ```
> kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create 
> --partitions 6
> ```
> Start a consumer
> ```
> kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic 
> --formatter kafka.tools.DefaultMessageFormatter --property 
> print.timestamp=true --property print.key=true --property print.value=true 
> --property print.partition=true --from-beginning
> ```
> Start a producer using `RoundRobinPartitioner`
> ```
> kafka-console-producer.sh --bootstrap-server localhost:9092 
> --producer-property 
> partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 
> --topic test_topic
> ```
> And you will see only 3 partitions get produced to (not 6)
> This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the 
> expected behavior).
> But with 6 it happens.
> Verified to be like this on my colleague's computer too. 
> ```
> CreateTime: 1672859052082 Partition: 5 null a
> CreateTime: 1672859053586 Partition: 1 null b
> CreateTime: 1672859054995 Partition: 3 null C
> CreateTime: 1672859056405 Partition: 5 null d
> CreateTime: 1672859057861 Partition: 1 null e
> CreateTime: 1672859059181 Partition: 3 null e
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14573) RoundRobinPartitioner doesn't work as expected with topic that has 6 partitions

2023-01-04 Thread Stephane Maarek (Jira)
Stephane Maarek created KAFKA-14573:
---

 Summary: RoundRobinPartitioner doesn't work as expected with topic 
that has 6 partitions
 Key: KAFKA-14573
 URL: https://issues.apache.org/jira/browse/KAFKA-14573
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3.1
Reporter: Stephane Maarek
 Attachments: image-2023-01-04-20-15-43-039.png

Create a topic with 6 partitions (that's how the behavior is observed)
```
kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --create 
--partitions 6
```

Start a consumer
```
kafka-console-consumer --bootstrap-server localhost:9092 --topic test_topic 
--formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true 
--property print.key=true --property print.value=true --property 
print.partition=true --from-beginning
```

Start a producer using `RoundRobinPartitioner`
```
kafka-console-producer.sh --bootstrap-server localhost:9092 --producer-property 
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner 
--topic test_topic
```

And you will see only 3 partitions get produced to (not 6)

This doesn't happen with 3 partitions, 5, 7, 9 partitions (we get the expected 
behavior).

But with 6 it happens.

Verified to be like this on my colleague's computer too. 

 !image-2023-01-04-20-15-43-039.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-15 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688044#comment-16688044
 ] 

Stephane Maarek commented on KAFKA-1194:


[~Kobi Hikri] excellent, I'll be happy to help out and test your PR.
Here's the process:
1) go to github.com/apache/kafka
2) create a fork (top right corner). I'll assume your username is "user"
3) you now have a copy of the repo at github.com/user/kafka
4) add a remote to your git: git remote add myfork 
g...@gitlab.com:user/kafka.git
5) push your branch to your remote: git push -u myfork mybranch
6) go back to the web interface at github.com/apache/kafka
7) you should see a yellow bar to help you do a pull request. Else follow this 
guide: https://help.github.com/articles/creating-a-pull-request/

Hope that helps!
Stephane

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612019#comment-16612019
 ] 

Stephane Maarek commented on KAFKA-1194:


Thanks [~Kobi Hikri]! That's what I thought, it was a quick fix for the log 
cleaner, but it breaks the delete feature. At least the brokers don't crash 
anymore
To be honest I won't pursue more fix because of the lack of time, but I really 
hope someone in the core team can pick this up.

To me, it seems we have pinpointed through the PR all the points of potential 
failures for Windows.
As for the quality of the fix, I would say very low, I honestly don't 
understand much to the Windows file system and the order of Kafka's internal 
for log cleaning and deletion.

I'm really hoping this is enough research so that [~ijuma] or [~lindong] can 
probably determine a better fix. 
In a super ideal world, the function to fix is 
{code}Utils.atomicMoveWithFallback(src, dst);{code}

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-05 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604408#comment-16604408
 ] 

Stephane Maarek commented on KAFKA-1194:


hi [~Kobi Hikri], can you please test one more time 
https://github.com/apache/kafka/pull/5603 ?
I hadn't fixed the class that is giving you errors
If you manage to trigger errors, can you please give me the steps to reproduce?
BTW, you may see a lot of "ERROR" logs, that's intended, just so I can see the 
code path. 
Please past full log somewhere if you get another error, that'll help

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-02 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601613#comment-16601613
 ] 

Stephane Maarek commented on KAFKA-1194:


[~lindong] Not fixed as of 
{code}
[2018-09-02 11:03:12,789] INFO Kafka version : 2.1.0-SNAPSHOT 
(org.apache.kafka.common.utils.AppInfoParser)
[2018-09-02 11:03:12,789] INFO Kafka commitId : 7299e1836ba2 
(org.apache.kafka.common.utils.AppInfoParser)
{code}

Here are the full steps to reproduce and trigger:
{code}
C:\kafka_2.11-2.1.0-SNAPSHOT>kafka-topics.bat --zookeeper 127.0.0.1:2181 
--topic second_topic --create --partitions 3 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or 
underscore ('_') could collide. To avoid issues it is best to use either, but 
not both.
Created topic "second_topic".

C:\kafka_2.11-2.1.0-SNAPSHOT>kafka-console-producer.bat --broker-list 
127.0.0.1:9092 --topic second_topic
>hello
>world
>hello
>Terminate batch job (Y/N)? Y

C:\kafka_2.11-2.1.0-SNAPSHOT>kafka-topics.bat --zookeeper 127.0.0.1:2181 
--topic second_topic --delete
Topic second_topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
{code}

Which triggers Kafka shutdown with:
{code}
[2018-09-02 11:04:15,460] ERROR Error while renaming dir for second_topic-1 in 
log dir C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka 
(kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: 
C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka\second_topic-1 -> 
C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka\second_topic-1.d1ceee24d7474152b6fedd61903449e5-delete
at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
at sun.nio.fs.WindowsFileCopy.move(Unknown Source)
at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)
at java.nio.file.Files.move(Unknown Source)
at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:809)
at kafka.log.Log$$anonfun$renameDir$1.apply$mcV$sp(Log.scala:689)
at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:687)
at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:687)
at kafka.log.Log.maybeHandleIOException(Log.scala:1842)
at kafka.log.Log.renameDir(Log.scala:687)
at kafka.log.LogManager.asyncDelete(LogManager.scala:833)
at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:271)
at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259)
at kafka.cluster.Partition.delete(Partition.scala:265)
at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:340)
at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:370)
at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:368)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:368)
at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:200)
at kafka.server.KafkaApis.handle(KafkaApis.scala:111)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Unknown Source)
Suppressed: java.nio.file.AccessDeniedException: 
C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka\second_topic-1 -> 
C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka\second_topic-1.d1ceee24d7474152b6fedd61903449e5-delete
at sun.nio.fs.WindowsException.translateToIOException(Unknown 
Source)
at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown 
Source)
at sun.nio.fs.WindowsFileCopy.move(Unknown Source)
at sun.nio.fs.WindowsFileSystemProvider.move(Unknown Source)
at java.nio.file.Files.move(Unknown Source)
at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:806)
... 23 more
[2018-09-02 11:04:15,460] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir C:\kafka_2.11-2.1.0-SNAPSHOT\data\kafka 
(kafka.server.ReplicaManager)
{code}

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects 

[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-09-02 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601552#comment-16601552
 ] 

Stephane Maarek commented on KAFKA-7278:


[~lindong] Unfortunately, this does not fix 
https://issues.apache.org/jira/browse/KAFKA-1194
I'll post the details there... 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7355) Topic Configuration Changes are not applied until reboot

2018-08-29 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-7355:
--

 Summary: Topic Configuration Changes are not applied until reboot
 Key: KAFKA-7355
 URL: https://issues.apache.org/jira/browse/KAFKA-7355
 Project: Kafka
  Issue Type: Bug
  Components: config, core
Affects Versions: 2.0.0
Reporter: Stephane Maarek


Steps to reproduce:

{code}
kafka-topics --zookeeper 127.0.0.1:2181 --create --topic employee-salary 
--partitions 1 --replication-factor 1

kafka-configs --zookeeper 127.0.0.1:2181 --alter --entity-type topics 
--entity-name employee-salary --add-config 
cleanup.policy=compact,min.cleanable.dirty.ratio=0.001,segment.ms=5000

kafka-configs --zookeeper 127.0.0.1:2181 --alter --entity-type topics 
--entity-name employee-salary

kafka-console-producer --broker-list 127.0.0.1:9092 --topic employee-salary 
--property parse.key=true --property key.separator=,
{code}

Try publishing a bunch of data, and no segment roll over will happen (even 
though segment.ms=5000). I looked at the kafka directory and the kafka logs to 
ensure 

I noticed the broker processed the notification of config changes, but the 
behaviour was not updated to use the new config values nonetheless. 

After restarting the broker, the expected behaviour is observed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2018-07-01 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529262#comment-16529262
 ] 

Stephane Maarek commented on KAFKA-7077:


Related but not the same. KAFKA-6340 requires some profound changes to how data 
is produced and consumed
This just enables idempotence, which doesn't change consumers behaviour, and 
should increase performance and guarantees.

I'd love to hear your input on the KIP mailing thread: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
 

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7077) producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true)

2018-06-20 Thread Stephane Maarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephane Maarek updated KAFKA-7077:
---
Description: KIP Link: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent

> producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") 
> producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true)
> ---
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2018-06-20 Thread Stephane Maarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephane Maarek updated KAFKA-7077:
---
Summary: KIP-318: Make Kafka Connect Source idempotent  (was: 
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") 
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true))

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7077) producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true)

2018-06-20 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-7077:
--

 Summary: 
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5") 
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE, true)
 Key: KAFKA-7077
 URL: https://issues.apache.org/jira/browse/KAFKA-7077
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Stephane Maarek
Assignee: Stephane Maarek






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7066) Make Streams Runtime Error User Friendly in Case of Serialisation exception

2018-06-16 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16514953#comment-16514953
 ] 

Stephane Maarek commented on KAFKA-7066:


Thanks [~mjsax] . I think that helps, but my PR looks like it goes at the most 
common - lowest level for all these issues, which addresses all kinds of 
stores. 
With logging though, I'd rather have too much than less, so I don't think any 
issues supersedes others

> Make Streams Runtime Error User Friendly in Case of Serialisation exception
> ---
>
> Key: KAFKA-7066
> URL: https://issues.apache.org/jira/browse/KAFKA-7066
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
> Fix For: 2.0.0
>
>
> This kind of exception can be cryptic for the beginner:
> {code:java}
> ERROR stream-thread 
> [favourite-colors-application-a336770d-6ba6-4bbb-8681-3c8ea91bd12e-StreamThread-1]
>  Failed to process stream task 2_0 due to the following error: 
> (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105)
> java.lang.ClassCastException: java.lang.Long cannot be cast to 
> java.lang.String
> at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
> at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
> at 
> org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95)
> at 
> org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
> We should add more detailed logging already present in SinkNode to assist the 
> user into solving this issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7066) Make Streams Runtime Error User Friendly in Case of Serialisation exception

2018-06-16 Thread Stephane Maarek (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephane Maarek reassigned KAFKA-7066:
--

Assignee: Stephane Maarek

> Make Streams Runtime Error User Friendly in Case of Serialisation exception
> ---
>
> Key: KAFKA-7066
> URL: https://issues.apache.org/jira/browse/KAFKA-7066
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
> Fix For: 2.0.0
>
>
> This kind of exception can be cryptic for the beginner:
> {code:java}
> ERROR stream-thread 
> [favourite-colors-application-a336770d-6ba6-4bbb-8681-3c8ea91bd12e-StreamThread-1]
>  Failed to process stream task 2_0 due to the following error: 
> (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105)
> java.lang.ClassCastException: java.lang.Long cannot be cast to 
> java.lang.String
> at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
> at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
> at 
> org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95)
> at 
> org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
> We should add more detailed logging already present in SinkNode to assist the 
> user into solving this issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7066) Make Streams Runtime Error User Friendly in Case of Serialisation exception

2018-06-16 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-7066:
--

 Summary: Make Streams Runtime Error User Friendly in Case of 
Serialisation exception
 Key: KAFKA-7066
 URL: https://issues.apache.org/jira/browse/KAFKA-7066
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.1.0
Reporter: Stephane Maarek
 Fix For: 2.0.0


This kind of exception can be cryptic for the beginner:
{code:java}
ERROR stream-thread 
[favourite-colors-application-a336770d-6ba6-4bbb-8681-3c8ea91bd12e-StreamThread-1]
 Failed to process stream task 2_0 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105)
java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
at 
org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
at 
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at 
org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95)
at 
org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
at 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}

We should add more detailed logging already present in SinkNode to assist the 
user into solving this issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6323) punctuate with WALL_CLOCK_TIME triggered immediately

2017-12-08 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284366#comment-16284366
 ] 

Stephane Maarek commented on KAFKA-6323:


Fully agree [~guozhang] . I fully agree on the punctuate once as well (even if 
T2 is 5 intervals away), I have observed punctuate being called way to many 
times if the data does a big jump. 

Finally, is there any interest or use cases in using both a wall clock and 
event driven punctuate? Might require a KIP for that one

> punctuate with WALL_CLOCK_TIME triggered immediately
> 
>
> Key: KAFKA-6323
> URL: https://issues.apache.org/jira/browse/KAFKA-6323
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Frederic Arno
>Assignee: Frederic Arno
> Fix For: 1.1.0, 1.0.1
>
>
> When working on a custom Processor from which I am scheduling a punctuation 
> using WALL_CLOCK_TIME. I've noticed that whatever the punctuation interval I 
> set, a call to my Punctuator is always triggered immediately.
> Having a quick look at kafka-streams' code, I could find that all 
> PunctuationSchedule's timestamps are matched against the current time in 
> order to decide whether or not to trigger the punctuator 
> (org.apache.kafka.streams.processor.internals.PunctuationQueue#mayPunctuate). 
> However, I've only seen code that initializes PunctuationSchedule's timestamp 
> to 0, which I guess is what is causing an immediate punctuation.
> At least when using WALL_CLOCK_TIME, shouldn't the PunctuationSchedule's 
> timestamp be initialized to current time + interval?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6092) Time passed in punctuate call is currentTime, not punctuate schedule time.

2017-10-19 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-6092:
--

 Summary: Time passed in punctuate call is currentTime, not 
punctuate schedule time. 
 Key: KAFKA-6092
 URL: https://issues.apache.org/jira/browse/KAFKA-6092
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Stephane Maarek


The java doc specifies that for a Transformer, calling context.schedule calls 
punctuate every 1000ms. This is not entirely accurate, as if no data is 
received for a while, punctuate won't be called.

{code}
 * void init(ProcessorContext context) {
 * this.context = context;
 * this.state = context.getStateStore("myTransformState");
 * context.schedule(1000); // call #punctuate() each 1000ms
 * }
{code}

When you receive new data say after 20 seconds, punctuate will play catch up 
and will be called 20 times at reception of the new data. 

the signature of punctuate is
{code}
* KeyValue punctuate(long timestamp) {
 * // can access this.state
 * // can emit as many new KeyValue pairs as required via 
this.context#forward()
 * return null; // don't return result -- can also be "new 
KeyValue()"
 * }
{code}

but the timestamp being passed is currentTimestamp at the time of the call to 
punctuate, not at the time the punctuate was scheduled. It is very confusing 
and I think the timestamp should represent the one at which the punctuate 
should have been scheduled. Getting the current timestamp is not adding much 
information as it can easily obtained using  System.currentTimeMillis();



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-10 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199387#comment-16199387
 ] 

Stephane Maarek commented on KAFKA-6007:


I agree. You are correct in your assessment.
Overall this seems like a user error, and the error message is erroneous. It 
leads to believe there's a problem with connect itself whereas it's a misusage 
of the framework. 

I think if you can change the code so that the error messages are more explicit 
and context accurate, this would give enough feedback to the user to fix the 
issues and place the connectors where appropriate. 

This kind of error message is cool:
“we loaded this connector from the plugins, and your transformer is not in the 
plugins”
“we loaded this connector from the cp, and your transformer is not in the cp”

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 0.11.0.1, 1.0.0
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5993) Kafka Admin Client throws a warning for sasl.jaas.config

2017-10-03 Thread Stephane Maarek (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephane Maarek updated KAFKA-5993:
---
Summary: Kafka Admin Client throws a warning for sasl.jaas.config  (was: 
Kafka AdminClient does not support standard security settings)

> Kafka Admin Client throws a warning for sasl.jaas.config
> 
>
> Key: KAFKA-5993
> URL: https://issues.apache.org/jira/browse/KAFKA-5993
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Stephane Maarek
>
> Kafka Admin Client does not support basic security configurations, such as 
> "sasl.jaas.config".
> Therefore it makes it impossible to use against a secure cluster
> ```
> 14:12:12.948 [main] WARN  org.apache.kafka.clients.admin.AdminClientConfig - 
> The configuration 'sasl.jaas.config' was supplied but isn't a known config.
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5993) Kafka AdminClient does not support standard security settings

2017-10-03 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16190808#comment-16190808
 ] 

Stephane Maarek edited comment on KAFKA-5993 at 10/4/17 4:52 AM:
-

[~ijuma] Indeed it does support security settings. The log still shows that 
WARN, which is what was misleading on my end.

See full log here (it's using the PLAINTEXT protocol on purpose for now):

{code:java}
15:42:18.548 [main] INFO  org.apache.kafka.clients.admin.AdminClientConfig - 
AdminClientConfig values: 
bootstrap.servers = [localhost:9092]
client.id = 
connections.max.idle.ms = 30
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 12
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name connections-closed:
15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name connections-created:
15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name bytes-sent-received:
15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name bytes-sent:
15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name bytes-received:
15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name select-time:
15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name io-time:
15:42:18.549 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster 
metadata version 1 to Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: 
null)], partitions = [])
15:42:18.550 [main] WARN  org.apache.kafka.clients.admin.AdminClientConfig - 
The configuration 'sasl.jaas.config' was supplied but isn't a known config.
15:42:18.550 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka 
version : 0.11.0.1
15:42:18.550 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka 
commitId : c2a0d5f9b1f45bf5
{code}

My code to generate my admin client is:
  
{code:java}
  public static AdminClient getAdminClient() {

Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,

Optional.ofNullable(System.getenv("KAFKA_BOOTSTRAP_SERVERS")).orElse("localhost:9092"));
adminProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,

Optional.ofNullable(System.getenv("SECURITY_PROTOCOL")).orElse("PLAINTEXT"));
adminProps.put(SaslConfigs.SASL_JAAS_CONFIG,

Optional.ofNullable(System.getenv("SASL_JAAS_CONFIG")).orElse(""));

return AdminClient.create(adminProps);
}

{code}

If you feel that WARN is okay, we can close the JIRA, otherwise maybe we should 
rename the JIRA and address the WARN?


was (Author: stephane.maa...@gmail.com):
[~ijuma] Indeed it does support security settings. The log still shows that 
WARN, which is what was misleading on my end.

See full log here:

{code:java}
15:42:18.548 [main] INFO  org.apache.kafka.clients.admin.AdminClientConfig - 
AdminClientConfig values: 
bootstrap.servers = [localhost:9092]
client.id = 
connections.max.idle.ms = 30
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 12
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = 

[jira] [Commented] (KAFKA-5993) Kafka AdminClient does not support standard security settings

2017-10-03 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16190808#comment-16190808
 ] 

Stephane Maarek commented on KAFKA-5993:


[~ijuma] Indeed it does support security settings. The log still shows that 
WARN, which is what was misleading on my end.

See full log here:

{code:java}
15:42:18.548 [main] INFO  org.apache.kafka.clients.admin.AdminClientConfig - 
AdminClientConfig values: 
bootstrap.servers = [localhost:9092]
client.id = 
connections.max.idle.ms = 30
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 12
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name connections-closed:
15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name connections-created:
15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name bytes-sent-received:
15:42:18.548 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name bytes-sent:
15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name bytes-received:
15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name select-time:
15:42:18.549 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added 
sensor with name io-time:
15:42:18.549 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster 
metadata version 1 to Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: 
null)], partitions = [])
15:42:18.550 [main] WARN  org.apache.kafka.clients.admin.AdminClientConfig - 
The configuration 'sasl.jaas.config' was supplied but isn't a known config.
15:42:18.550 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka 
version : 0.11.0.1
15:42:18.550 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka 
commitId : c2a0d5f9b1f45bf5
{code}

My code to generate my admin client is:
  
{code:java}
  public static AdminClient getAdminClient() {

Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,

Optional.ofNullable(System.getenv("KAFKA_BOOTSTRAP_SERVERS")).orElse("localhost:9092"));
adminProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,

Optional.ofNullable(System.getenv("SECURITY_PROTOCOL")).orElse("PLAINTEXT"));
adminProps.put(SaslConfigs.SASL_JAAS_CONFIG,

Optional.ofNullable(System.getenv("SASL_JAAS_CONFIG")).orElse(""));

return AdminClient.create(adminProps);
}

{code}

If you feel that WARN is okay, we can close the JIRA, otherwise maybe we should 
rename the JIRA and address the WARN?

> Kafka AdminClient does not support standard security settings
> -
>
> Key: KAFKA-5993
> URL: https://issues.apache.org/jira/browse/KAFKA-5993
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Stephane Maarek
>
> Kafka Admin Client does not support basic security configurations, such as 
> "sasl.jaas.config".
> Therefore it makes it impossible to use against a secure cluster
> ```
> 14:12:12.948 [main] WARN  org.apache.kafka.clients.admin.AdminClientConfig - 
> The configuration 'sasl.jaas.config' was supplied but isn't a known config.
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2017-10-03 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-6007:
--

 Summary: Connect can't validate against transforms in plugins.path
 Key: KAFKA-6007
 URL: https://issues.apache.org/jira/browse/KAFKA-6007
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Stephane Maarek


Kafka Connect can't validate a custom transformation if placed in plugins path.
Here's the output I get on the validate call:


{code:java}
Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
configuration transforms.Flat.type: Class 
com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
Invalid value null for configuration transforms.Flat.type: Not a Transformation
"recommended_values": [   
"com.mycorp.kafka.transforms.Flatten$Key",
"com.mycorp.kafka.transforms.Flatten$Value",
"com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
"org.apache.kafka.connect.transforms.Cast$Key",
"org.apache.kafka.connect.transforms.Cast$Value",
"org.apache.kafka.connect.transforms.ExtractField$Key",
"org.apache.kafka.connect.transforms.ExtractField$Value",
"org.apache.kafka.connect.transforms.Flatten$Key",
"org.apache.kafka.connect.transforms.Flatten$Value",
"org.apache.kafka.connect.transforms.HoistField$Key",
"org.apache.kafka.connect.transforms.HoistField$Value",
"org.apache.kafka.connect.transforms.InsertField$Key",
"org.apache.kafka.connect.transforms.InsertField$Value",
"org.apache.kafka.connect.transforms.MaskField$Key",
"org.apache.kafka.connect.transforms.MaskField$Value",
"org.apache.kafka.connect.transforms.RegexRouter",
"org.apache.kafka.connect.transforms.ReplaceField$Key",
"org.apache.kafka.connect.transforms.ReplaceField$Value",
"org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"org.apache.kafka.connect.transforms.TimestampConverter$Key",
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"org.apache.kafka.connect.transforms.TimestampRouter",
"org.apache.kafka.connect.transforms.ValueToKey"],

{code}

As you can see the class appear in the recommended values (!) but can't be 
picked up on the validate call. 

I believe it's because the recommender implements class discovery using plugins:
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194

But the class inference itself doesn't:
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199

(I'm not an expert in class loading though, just a guess... Unsure how to fix)

A quick fix is to add the transformations in the ClassPath itself, but that 
defeats the point a bit. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5993) Kafka AdminClient does not support standard security settings

2017-09-29 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185455#comment-16185455
 ] 

Stephane Maarek commented on KAFKA-5993:


Okay thanks indeed that's odd. Let me confirm. Thanks Ismael!




> Kafka AdminClient does not support standard security settings
> -
>
> Key: KAFKA-5993
> URL: https://issues.apache.org/jira/browse/KAFKA-5993
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Stephane Maarek
>
> Kafka Admin Client does not support basic security configurations, such as 
> "sasl.jaas.config".
> Therefore it makes it impossible to use against a secure cluster
> ```
> 14:12:12.948 [main] WARN  org.apache.kafka.clients.admin.AdminClientConfig - 
> The configuration 'sasl.jaas.config' was supplied but isn't a known config.
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5993) Kafka AdminClient does not support standard security settings

2017-09-29 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185445#comment-16185445
 ] 

Stephane Maarek commented on KAFKA-5993:


I'm not on my laptop but I had created an admin client and just added a
property. Can send a code sample next week but it's easy to reproduce




> Kafka AdminClient does not support standard security settings
> -
>
> Key: KAFKA-5993
> URL: https://issues.apache.org/jira/browse/KAFKA-5993
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Stephane Maarek
>
> Kafka Admin Client does not support basic security configurations, such as 
> "sasl.jaas.config".
> Therefore it makes it impossible to use against a secure cluster
> ```
> 14:12:12.948 [main] WARN  org.apache.kafka.clients.admin.AdminClientConfig - 
> The configuration 'sasl.jaas.config' was supplied but isn't a known config.
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5993) Kafka AdminClient does not support standard security settings

2017-09-28 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-5993:
--

 Summary: Kafka AdminClient does not support standard security 
settings
 Key: KAFKA-5993
 URL: https://issues.apache.org/jira/browse/KAFKA-5993
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Stephane Maarek


Kafka Admin Client does not support basic security configurations, such as 
"sasl.jaas.config".
Therefore it makes it impossible to use against a secure cluster

```
14:12:12.948 [main] WARN  org.apache.kafka.clients.admin.AdminClientConfig - 
The configuration 'sasl.jaas.config' was supplied but isn't a known config.
```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5992) Better Java Documentation for AdminClient Exceptions

2017-09-28 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-5992:
--

 Summary: Better Java Documentation for AdminClient Exceptions
 Key: KAFKA-5992
 URL: https://issues.apache.org/jira/browse/KAFKA-5992
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Stephane Maarek


When invoking a describeTopics operation on a topic that does not exist, we get 
an InvalidTopicException as a RuntimeException.

I believe this should be documented, and the API maybe changed:

For example changing:
{code:java}
public DescribeTopicsResult describeTopics(Collection topicNames) {
{code}

To:
{code:java}
public DescribeTopicsResult describeTopics(Collection topicNames) 
throws InvalidTopicException 
{code}

Additionally, in case multiple topics don't exist, only the first one will 
throw an error. This is really not scalable. 

Maybe the DescribeTopicsResult could have a Boolean "topicExists" ? 
Up for discussion





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5851) CPU overhead to having partitions (even inactive)

2017-09-07 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-5851:
--

 Summary: CPU overhead to having partitions (even inactive)
 Key: KAFKA-5851
 URL: https://issues.apache.org/jira/browse/KAFKA-5851
 Project: Kafka
  Issue Type: Bug
 Environment: 0.11.0.0, 0.10.x
Reporter: Stephane Maarek


We're running on three r4.xlarge, and this is our dev environment.

Some people managed to create 1200 topics with 3 partitions, so we end up at 
4000 partitions per broker (we have a replication factor of 3). Even though no 
data goes through the cluster (it sits at a comfortable 20 messages per 
seconds), we saw CPU at 100% (out of 400%).

I went ahead today and deleted 700 topics that I knew were unused. And CPU went 
drastically down. See image

!https://i.imgur.com/OIPTwDM.png!

We use the defaults for our brokers, and they use PLAINTEXT internally to 
replicate. 

I'm not sure of the root cause of this, threads, replication, log cleanup, etc, 
and I guess it wouldn't be too hard to replicate (just create 1000 topics on a 
vanilla cluster and see CPU go up). 

Hope that helps



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)