[jira] [Created] (KAFKA-3824) Docs indicate auto.commit breaks at least once delivery but that is incorrect

2016-06-10 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-3824:


 Summary: Docs indicate auto.commit breaks at least once delivery 
but that is incorrect
 Key: KAFKA-3824
 URL: https://issues.apache.org/jira/browse/KAFKA-3824
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.10.0.0
Reporter: Jay Kreps


The javadocs for the new consumer indicate that auto commit breaks at least 
once delivery. This is no longer correct as of 0.10. 

http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1981) Make log compaction point configurable

2016-06-10 Thread Eric Wasserman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325578#comment-15325578
 ] 

Eric Wasserman commented on KAFKA-1981:
---

[~ijuma] A new pull request is available 
(https://github.com/apache/kafka/pull/1494) could you please review it. Thanks, 
Eric.

> Make log compaction point configurable
> --
>
> Key: KAFKA-1981
> URL: https://issues.apache.org/jira/browse/KAFKA-1981
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>  Labels: newbie++
> Attachments: KIP for Kafka Compaction Patch.md
>
>
> Currently if you enable log compaction the compactor will kick in whenever 
> you hit a certain "dirty ratio", i.e. when 50% of your data is uncompacted. 
> Other than this we don't give you fine-grained control over when compaction 
> occurs. In addition we never compact the active segment (since it is still 
> being written to).
> Other than this we don't really give you much control over when compaction 
> will happen. The result is that you can't really guarantee that a consumer 
> will get every update to a compacted topic--if the consumer falls behind a 
> bit it might just get the compacted version.
> This is usually fine, but it would be nice to make this more configurable so 
> you could set either a # messages, size, or time bound for compaction.
> This would let you say, for example, "any consumer that is no more than 1 
> hour behind will get every message."
> This should be relatively easy to implement since it just impacts the 
> end-point the compactor considers available for compaction. I think we 
> already have that concept, so this would just be some other overrides to add 
> in when calculating that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-10 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325563#comment-15325563
 ] 

Maysam Yabandeh commented on KAFKA-3693:


[~junrao] oh yeah that makes sense. Thanks for your help in identifying the 
root cause.

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
> val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>   val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>   val offsetMap = checkpoint.read
>   if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1495: Minor: Respect the default value for partition arg...

2016-06-10 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1495

Minor: Respect the default value for partition argument in 
SimpleConsumerShell

The `partition` argument is not marked as required, and has a default of 
`0`, according to the tool's help message. However, if `partition` is not 
provided the command returns with `Missing required argument "[partition]"`. 
This patch is to fix the required arguments of the tool by removing `partition` 
from them.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka 
minor/simple_consumer_shell_update_required_args

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1495


commit 54e7fba208d83526044d78b8ae85a428cb94157e
Author: Vahid Hashemian 
Date:   2016-06-10T23:53:07Z

Minor: Respect the default value for partition argument in 
SimpleConsumerShell

The 'partition' argument is not marked as required, and has a default of 0, 
according to the tool's help message.
However, if 'partition' is not provided the command returns with 'Missing 
required argument "[partition]"'.
This patch is to fix the required arguments of the tool by removing 
'partition' from them.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325510#comment-15325510
 ] 

Jun Rao commented on KAFKA-3693:


[~maysamyabandeh], thanks for confirming this. The root of the problem was 
described in https://issues.apache.org/jira/browse/KAFKA-1120.

As for your suggestion, I agree that we should be more defensive. The original 
intention for the current logic is that while a broker is down, some of the 
topic/partitions could be deleted. When the broker is restarted, the first 
LeaderAndIsrRequest tells the broker the set of topic/partitions to be kept. 
So, we probably need to think a bit more on this. 

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
> val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>   val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>   val offsetMap = checkpoint.read
>   if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3818) Change Mirror Maker default assignment strategy to round robin

2016-06-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325499#comment-15325499
 ] 

Vahid Hashemian commented on KAFKA-3818:


[~hachikuji] Since _round robin_ provides a more balanced output I'm curious 
why _range_ is still being used as the default strategy for the consumer.

> Change Mirror Maker default assignment strategy to round robin
> --
>
> Key: KAFKA-3818
> URL: https://issues.apache.org/jira/browse/KAFKA-3818
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> It might make more sense to use round robin assignment by default for MM 
> since it gives a better balance between the instances, in particular when the 
> number of MM instances exceeds the typical number of partitions per topic. 
> There doesn't seem to be any need to keep range assignment since 
> copartitioning is not an issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change

2016-06-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325496#comment-15325496
 ] 

Jun Rao commented on KAFKA-1120:


A better way is probably for the controller to store the ZK version of the 
broker registration path. When a ZK watcher is fired, the controller can read 
the current ZK version of each of the broker registration and see if it has 
changed. If so, the controller will treat the broker as it has failed and then 
restarted.

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3791) Broken tools -- need better way to get offsets and other info

2016-06-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325481#comment-15325481
 ] 

Vahid Hashemian edited comment on KAFKA-3791 at 6/10/16 11:15 PM:
--

I agree that the deprecation message is not as clear as it can be, but I was 
able to successfully run the tool you mentioned by providing a consumer 
{{group}} and the {{zookeeper}} address, even though the command's help message 
has a default for {{zookeeper}}!


was (Author: vahid):
I agree that the deprecation message is not as clear as it can be, but I was 
able to successfully run the tool you mentioned by providing a consumer group 
({{--group}}) and the zookeeper address ({{--zookeeper}}), even though the 
command's help message has a default for {{--zookeeper}}!

> Broken tools -- need better way to get offsets and other info
> -
>
> Key: KAFKA-3791
> URL: https://issues.apache.org/jira/browse/KAFKA-3791
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.0.0
>Reporter: Greg Zoller
>
> Whenever I run included tools like kafka-consumer-offset-checker.sh I get 
> deprecation warnings and it doesn't work for me (offsets not returned).  
> These need to be fixed.  The suggested class in the deprecation warning is 
> not documented clearly in the docs.
> In general it would be nice to streamline and simplify the tool scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3791) Broken tools -- need better way to get offsets and other info

2016-06-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325481#comment-15325481
 ] 

Vahid Hashemian edited comment on KAFKA-3791 at 6/10/16 11:15 PM:
--

I agree that the deprecation message is not as clear as it can be, but I was 
able to successfully run the tool you mentioned by providing a consumer 
{{group}} and the {{zookeeper}} address, even though the command's help message 
specifies a default for {{zookeeper}}!


was (Author: vahid):
I agree that the deprecation message is not as clear as it can be, but I was 
able to successfully run the tool you mentioned by providing a consumer 
{{group}} and the {{zookeeper}} address, even though the command's help message 
has a default for {{zookeeper}}!

> Broken tools -- need better way to get offsets and other info
> -
>
> Key: KAFKA-3791
> URL: https://issues.apache.org/jira/browse/KAFKA-3791
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.0.0
>Reporter: Greg Zoller
>
> Whenever I run included tools like kafka-consumer-offset-checker.sh I get 
> deprecation warnings and it doesn't work for me (offsets not returned).  
> These need to be fixed.  The suggested class in the deprecation warning is 
> not documented clearly in the docs.
> In general it would be nice to streamline and simplify the tool scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3791) Broken tools -- need better way to get offsets and other info

2016-06-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325481#comment-15325481
 ] 

Vahid Hashemian commented on KAFKA-3791:


I agree that the deprecation message is not as clear as it can be, but I was 
able to successfully run the tool you mentioned by providing a consumer group 
({{--group}}) and the zookeeper address ({{--zookeeper}}), even though the 
command's help message has a default for {{--zookeeper}}!

> Broken tools -- need better way to get offsets and other info
> -
>
> Key: KAFKA-3791
> URL: https://issues.apache.org/jira/browse/KAFKA-3791
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.0.0
>Reporter: Greg Zoller
>
> Whenever I run included tools like kafka-consumer-offset-checker.sh I get 
> deprecation warnings and it doesn't work for me (offsets not returned).  
> These need to be fixed.  The suggested class in the deprecation warning is 
> not documented clearly in the docs.
> In general it would be nice to streamline and simplify the tool scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325454#comment-15325454
 ] 

Guozhang Wang commented on KAFKA-3801:
--

Before we introduced Kafka Streams, we did not consider adding static serdes 
since for produce / consumer, a single serde object is sufficient, and hence it 
is always required anyways; but for Kafka Streams where multiple serdes may be 
needed, creating one for each such case may be cumbersome but I thought that in 
practice, you won't be needing to many different ones (for example, you many 
use JSON / Avro / Thrift for most cases, and only for very few streams you 
would use a different one, like for primitive types) so they may be still okay.

If we observed common usage patterns where many objects need to be created for 
a single application, we can consider whether it makes more sense to do this 
optimization. I'm going to close it for now but feel free to re-open it 
whenever you want to resume this discussion.

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1491: remove size and message count constraints

2016-06-10 Thread ewasserman
Github user ewasserman closed the pull request at:

https://github.com/apache/kafka/pull/1491


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3801.
--
Resolution: Won't Fix

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3818) Change Mirror Maker default assignment strategy to round robin

2016-06-10 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3818:
---
Status: Patch Available  (was: Open)

> Change Mirror Maker default assignment strategy to round robin
> --
>
> Key: KAFKA-3818
> URL: https://issues.apache.org/jira/browse/KAFKA-3818
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> It might make more sense to use round robin assignment by default for MM 
> since it gives a better balance between the instances, in particular when the 
> number of MM instances exceeds the typical number of partitions per topic. 
> There doesn't seem to be any need to keep range assignment since 
> copartitioning is not an issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1491: remove size and message count constraints

2016-06-10 Thread ewasserman
GitHub user ewasserman reopened a pull request:

https://github.com/apache/kafka/pull/1491

remove size and message count constraints

KAFKA-1981Make log compaction point configurable

Reduced scope to controlling only minimum time before compaction. Changed 
to using message time when available.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ewasserman/kafka feat-compaction-time-lag

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1491.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1491


commit daf2b9bc56fbb5bd8cbc2fbb91782944cf11774a
Author: Eric Wasserman 
Date:   2016-03-15T00:57:07Z

remove size and message count constraints




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1491: remove size and message count constraints

2016-06-10 Thread ewasserman
Github user ewasserman closed the pull request at:

https://github.com/apache/kafka/pull/1491


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3818) Change Mirror Maker default assignment strategy to round robin

2016-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325441#comment-15325441
 ] 

ASF GitHub Bot commented on KAFKA-3818:
---

GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1493

KAFKA-3818: Change Mirror Maker default assignment strategy to round robin

Update Mirror Maker to use round robin assignment by default since it gives 
a better balance between the instances, in particular when the number of Mirror 
Maker instances exceeds the typical number of partitions per topic. There 
doesn't seem to be any need to keep range assignment since co-partitioning is 
not an issue.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3818

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1493.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1493


commit 24884d4ab13294d9e557e0698e9dafb672155208
Author: Vahid Hashemian 
Date:   2016-06-10T22:29:06Z

KAFKA-3818: Change Mirror Maker default assignment strategy to round robin

It might make more sense to use round robin assignment by default for 
Mirror Maker since it gives a better balance between the instances,
in particular when the number of Mirror Maker instances exceeds the typical 
number of partitions per topic.
There doesn't seem to be any need to keep range assignment since 
copartitioning is not an issue.




> Change Mirror Maker default assignment strategy to round robin
> --
>
> Key: KAFKA-3818
> URL: https://issues.apache.org/jira/browse/KAFKA-3818
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> It might make more sense to use round robin assignment by default for MM 
> since it gives a better balance between the instances, in particular when the 
> number of MM instances exceeds the typical number of partitions per topic. 
> There doesn't seem to be any need to keep range assignment since 
> copartitioning is not an issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1493: KAFKA-3818: Change Mirror Maker default assignment...

2016-06-10 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1493

KAFKA-3818: Change Mirror Maker default assignment strategy to round robin

Update Mirror Maker to use round robin assignment by default since it gives 
a better balance between the instances, in particular when the number of Mirror 
Maker instances exceeds the typical number of partitions per topic. There 
doesn't seem to be any need to keep range assignment since co-partitioning is 
not an issue.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3818

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1493.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1493


commit 24884d4ab13294d9e557e0698e9dafb672155208
Author: Vahid Hashemian 
Date:   2016-06-10T22:29:06Z

KAFKA-3818: Change Mirror Maker default assignment strategy to round robin

It might make more sense to use round robin assignment by default for 
Mirror Maker since it gives a better balance between the instances,
in particular when the number of Mirror Maker instances exceeds the typical 
number of partitions per topic.
There doesn't seem to be any need to keep range assignment since 
copartitioning is not an issue.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3818) Change Mirror Maker default assignment strategy to round robin

2016-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325436#comment-15325436
 ] 

ASF GitHub Bot commented on KAFKA-3818:
---

Github user vahidhashemian closed the pull request at:

https://github.com/apache/kafka/pull/1492


> Change Mirror Maker default assignment strategy to round robin
> --
>
> Key: KAFKA-3818
> URL: https://issues.apache.org/jira/browse/KAFKA-3818
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> It might make more sense to use round robin assignment by default for MM 
> since it gives a better balance between the instances, in particular when the 
> number of MM instances exceeds the typical number of partitions per topic. 
> There doesn't seem to be any need to keep range assignment since 
> copartitioning is not an issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1492: KAFKA-3818: Change Mirror Maker default assignment...

2016-06-10 Thread vahidhashemian
Github user vahidhashemian closed the pull request at:

https://github.com/apache/kafka/pull/1492


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk7 #1356

2016-06-10 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3817: handle null keys in KTableRepartitionMap

--
[...truncated 6269 lines...]

kafka.log.LogTest > testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets STARTED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator STARTED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest > testCorruptIndexRebuild STARTED

kafka.log.LogTest > testCorruptIndexRebuild PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved STARTED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testCompressedMessages STARTED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload STARTED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTest > testCorruptLog STARTED

kafka.log.LogTest > testCorruptLog PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset STARTED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testReopenThenTruncate STARTED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition STARTED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName STARTED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles STARTED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testSizeBasedLogRoll STARTED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter STARTED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testParseTopicPartitionName STARTED

kafka.log.LogTest > testParseTopicPartitionName PASSED

kafka.log.LogTest > testTruncateTo STARTED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testCleanShutdownFile STARTED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED


[GitHub] kafka pull request #1492: Change Mirror Maker default assignment strategy to...

2016-06-10 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/1492

Change Mirror Maker default assignment strategy to round robin

Update Mirror Maker to to use round robin assignment by default since it 
gives a better balance between the instances, in particular when the number of 
Mirror Maker instances exceeds the typical number of partitions per topic. 
There doesn't seem to be any need to keep range assignment since 
co-partitioning is not an issue.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3818

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1492.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1492






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-10 Thread Jun Rao
Rajini,

The new proposal sounds good to me too. My only question is what happens to
those existing quotas on client-id. Do we just treat them as the quota for
that client-id under ANONYMOUS user name?

Thanks,

Jun

On Fri, Jun 10, 2016 at 2:43 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Jay,
>
> Thank you for the quick feedback. It shouldn't be too hard since I had a PR
> earlier along these lines anyway.
>
> Jun, are you ok with this approach? If everyone agrees, I will close this
> vote, update the KIP and give some more time for discussions.
>
>
> On Fri, Jun 10, 2016 at 10:27 PM, Jay Kreps  wrote:
>
> > This sounds a lot better to me--hopefully it isn't too much harder! I do
> > think if it is possible to do this directly that will be better for users
> > than having an intermediate step since we always have to work through
> > migrating people who have setup quotas already from the old way to the
> new
> > way.
> >
> > -Jay
> >
> > On Fri, Jun 10, 2016 at 2:12 PM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> > > I do think client-id is a valid and useful grouping for quotas even in
> > > secure clusters - but only if clientA of user1 is treated as a
> different
> > > client-id from clientA of user2. Grouping of clients of a user enables
> > > users to allocate their quota effectively to their clients (eg.
> guarantee
> > > that critical event processing clients are not throttled by auditing
> > > clients). When the KIP was down-sized to support only user-based
> quotas,
> > I
> > > was hoping that we could extend it at a later time to enable
> hierarchical
> > > quotas. But I understand that it can be confusing to switch the
> semantics
> > > of quotas based on modes set in the brokers. So let me try once again
> to
> > > promote the original KIP-55. At the time, I did have a flag to revert
> to
> > > the existing client-id behavior to maintain compatibility. But perhaps
> > that
> > > is not necessary.
> > >
> > > How does this sound?
> > >
> > >- Quotas may be configured for users. Sub-quotas may be configured
> for
> > >client-ids of a user. Quotas may also be configured for client-ids
> of
> > > users
> > >with unlimited quota (Long.MaxValue).
> > >- Users who don't have a quota override are allocated a configurable
> > >default quota.
> > >- Client-ids without a sub-quota override share the remainder of the
> > >user quota if the user has a quota limit. Default quotas may be
> > defined
> > > for
> > >clients of users with unlimited quota.
> > >- For an insecure or single-user secure cluster, the existing
> > client-id
> > >based quota semantics can be achieved by configuring unlimited quota
> > for
> > >the user and sub-quota configuration for client-id that matches the
> > > current
> > >client-id quota configuration.
> > >- For a cluster mixes both secure and insecure access, client-id
> > quotas
> > >can be set for unauthenticated clients (unlimited quota for
> ANONYMOUS,
> > >quotas for client-ids) and user quotas can be set for authenticated
> > > users.
> > >- In a multi-user cluster, it is currently possible to define quotas
> > for
> > >client-ids that span multiple users. This will no longer be
> supported.
> > >
> > >
> > >
> > >
> > > On Fri, Jun 10, 2016 at 6:43 PM, Gwen Shapira 
> wrote:
> > >
> > > > I am not crazy about modes either. An earlier proposal supported both
> > > > client-ids and users at the same time, and it made more sense to me.
> I
> > > > believe it was dropped without proper discussion (Basically, Jun
> > > > mentioned it is complex and Rajini agreed to drop it). We should
> > > > probably rethink the complexity of supporting both vs the limitations
> > > > of "modes".
> > > >
> > > > As you said, we will have secure clients authenticating with users
> and
> > > > insecure clients authenticating with client-ids at the same time.
> > > >
> > > > On Fri, Jun 10, 2016 at 7:19 PM, Jay Kreps  wrote:
> > > > > Hey Rajini,
> > > > >
> > > > > 1. That makes sense to me. Is that reflected in the documentation
> > > > anywhere
> > > > > (I couldn't really find it)? Is there a way to discover that
> > > definition?
> > > > We
> > > > > do way better when we right this stuff down so it has an official
> > > > > definition users and developers can work off of...
> > > > > 2. If client id is a thing that makes sense even when you have
> users,
> > > why
> > > > > would you not want to quota on it?
> > > > >
> > > > > I am not wild about these "modes" where you boot the cluster in
> mode
> > X
> > > > and
> > > > > it behaves in one way and in mode Y and it behaves in another. It
> is
> > > > > complex then for users who expect to be able to set quotas but then
> > > have
> > > > to
> > > > > be able to get access to the filesystem of the kafka nodes to
> > discover
> > > > what
> > > > > mode the cluster is in to 

[jira] [Commented] (KAFKA-2394) Use RollingFileAppender by default in log4j.properties

2016-06-10 Thread Dustin Cote (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325356#comment-15325356
 ] 

Dustin Cote commented on KAFKA-2394:


[~ewencp] makes sense, I went ahead and updated the docs in the PR.  There 
wasn't a section on upgrading to 0.11 so I created one assuming that'll be the 
next non-fixpack to come out.  Since this is a potential incompatible change, 
I've set this to 0.11 for a target.

> Use RollingFileAppender by default in log4j.properties
> --
>
> Key: KAFKA-2394
> URL: https://issues.apache.org/jira/browse/KAFKA-2394
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dustin Cote
>Priority: Minor
>  Labels: newbie
> Fix For: 0.11.0.0
>
> Attachments: log4j.properties.patch
>
>
> The default log4j.properties bundled with Kafka uses ConsoleAppender and 
> DailyRollingFileAppender, which offer no protection to users from spammy 
> logging. In extreme cases (such as when issues like KAFKA-1461 are 
> encountered), the logs can exhaust the local disk space. This could be a 
> problem for Kafka adoption since new users are less likely to adjust the 
> logging properties themselves, and are more likely to have configuration 
> problems which result in log spam. 
> To fix this, we can use RollingFileAppender, which offers two settings for 
> controlling the maximum space that log files will use.
> maxBackupIndex: how many backup files to retain
> maxFileSize: the max size of each log file
> One question is whether this change is a compatibility concern? The backup 
> strategy and filenames used by RollingFileAppender are different from those 
> used by DailyRollingFileAppender, so any tools which depend on the old format 
> will break. If we think this is a serious problem, one solution would be to 
> provide two versions of log4j.properties and add a flag to enable the new 
> one. Another solution would be to include the RollingFileAppender 
> configuration in the default log4j.properties, but commented out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2394) Use RollingFileAppender by default in log4j.properties

2016-06-10 Thread Dustin Cote (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dustin Cote updated KAFKA-2394:
---
Fix Version/s: 0.11.0.0

> Use RollingFileAppender by default in log4j.properties
> --
>
> Key: KAFKA-2394
> URL: https://issues.apache.org/jira/browse/KAFKA-2394
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dustin Cote
>Priority: Minor
>  Labels: newbie
> Fix For: 0.11.0.0
>
> Attachments: log4j.properties.patch
>
>
> The default log4j.properties bundled with Kafka uses ConsoleAppender and 
> DailyRollingFileAppender, which offer no protection to users from spammy 
> logging. In extreme cases (such as when issues like KAFKA-1461 are 
> encountered), the logs can exhaust the local disk space. This could be a 
> problem for Kafka adoption since new users are less likely to adjust the 
> logging properties themselves, and are more likely to have configuration 
> problems which result in log spam. 
> To fix this, we can use RollingFileAppender, which offers two settings for 
> controlling the maximum space that log files will use.
> maxBackupIndex: how many backup files to retain
> maxFileSize: the max size of each log file
> One question is whether this change is a compatibility concern? The backup 
> strategy and filenames used by RollingFileAppender are different from those 
> used by DailyRollingFileAppender, so any tools which depend on the old format 
> will break. If we think this is a serious problem, one solution would be to 
> provide two versions of log4j.properties and add a flag to enable the new 
> one. Another solution would be to include the RollingFileAppender 
> configuration in the default log4j.properties, but commented out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-10 Thread Rajini Sivaram
Jay,

Thank you for the quick feedback. It shouldn't be too hard since I had a PR
earlier along these lines anyway.

Jun, are you ok with this approach? If everyone agrees, I will close this
vote, update the KIP and give some more time for discussions.


On Fri, Jun 10, 2016 at 10:27 PM, Jay Kreps  wrote:

> This sounds a lot better to me--hopefully it isn't too much harder! I do
> think if it is possible to do this directly that will be better for users
> than having an intermediate step since we always have to work through
> migrating people who have setup quotas already from the old way to the new
> way.
>
> -Jay
>
> On Fri, Jun 10, 2016 at 2:12 PM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > I do think client-id is a valid and useful grouping for quotas even in
> > secure clusters - but only if clientA of user1 is treated as a different
> > client-id from clientA of user2. Grouping of clients of a user enables
> > users to allocate their quota effectively to their clients (eg. guarantee
> > that critical event processing clients are not throttled by auditing
> > clients). When the KIP was down-sized to support only user-based quotas,
> I
> > was hoping that we could extend it at a later time to enable hierarchical
> > quotas. But I understand that it can be confusing to switch the semantics
> > of quotas based on modes set in the brokers. So let me try once again to
> > promote the original KIP-55. At the time, I did have a flag to revert to
> > the existing client-id behavior to maintain compatibility. But perhaps
> that
> > is not necessary.
> >
> > How does this sound?
> >
> >- Quotas may be configured for users. Sub-quotas may be configured for
> >client-ids of a user. Quotas may also be configured for client-ids of
> > users
> >with unlimited quota (Long.MaxValue).
> >- Users who don't have a quota override are allocated a configurable
> >default quota.
> >- Client-ids without a sub-quota override share the remainder of the
> >user quota if the user has a quota limit. Default quotas may be
> defined
> > for
> >clients of users with unlimited quota.
> >- For an insecure or single-user secure cluster, the existing
> client-id
> >based quota semantics can be achieved by configuring unlimited quota
> for
> >the user and sub-quota configuration for client-id that matches the
> > current
> >client-id quota configuration.
> >- For a cluster mixes both secure and insecure access, client-id
> quotas
> >can be set for unauthenticated clients (unlimited quota for ANONYMOUS,
> >quotas for client-ids) and user quotas can be set for authenticated
> > users.
> >- In a multi-user cluster, it is currently possible to define quotas
> for
> >client-ids that span multiple users. This will no longer be supported.
> >
> >
> >
> >
> > On Fri, Jun 10, 2016 at 6:43 PM, Gwen Shapira  wrote:
> >
> > > I am not crazy about modes either. An earlier proposal supported both
> > > client-ids and users at the same time, and it made more sense to me. I
> > > believe it was dropped without proper discussion (Basically, Jun
> > > mentioned it is complex and Rajini agreed to drop it). We should
> > > probably rethink the complexity of supporting both vs the limitations
> > > of "modes".
> > >
> > > As you said, we will have secure clients authenticating with users and
> > > insecure clients authenticating with client-ids at the same time.
> > >
> > > On Fri, Jun 10, 2016 at 7:19 PM, Jay Kreps  wrote:
> > > > Hey Rajini,
> > > >
> > > > 1. That makes sense to me. Is that reflected in the documentation
> > > anywhere
> > > > (I couldn't really find it)? Is there a way to discover that
> > definition?
> > > We
> > > > do way better when we right this stuff down so it has an official
> > > > definition users and developers can work off of...
> > > > 2. If client id is a thing that makes sense even when you have users,
> > why
> > > > would you not want to quota on it?
> > > >
> > > > I am not wild about these "modes" where you boot the cluster in mode
> X
> > > and
> > > > it behaves in one way and in mode Y and it behaves in another. It is
> > > > complex then for users who expect to be able to set quotas but then
> > have
> > > to
> > > > be able to get access to the filesystem of the kafka nodes to
> discover
> > > what
> > > > mode the cluster is in to know which kind of quota is applicable.
> > > >
> > > > I guess there are two ways to think about a feature: one is the
> > increment
> > > > from where we are, and the other is the resulting state after that
> > > > increment is taken. What I am asking is not "is this a low cost step
> > from
> > > > where we are?" which everyone can agree that it is, but rather "does
> > this
> > > > make sense as an end state--i.e. if you were starting fresh with
> > neither
> > > > users nor client ids nor quotas would you end up with this?".
> > > >
> > > 

Jenkins build is back to normal : kafka-0.10.0-jdk7 #123

2016-06-10 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #1491: remove size and message count constraints

2016-06-10 Thread ewasserman
GitHub user ewasserman opened a pull request:

https://github.com/apache/kafka/pull/1491

remove size and message count constraints

KAFKA-1981Make log compaction point configurable

Reduced scope to controlling only minimum time before compaction. Changed 
to using message time when available.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ewasserman/kafka feat-compaction-time-lag

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1491.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1491






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-10 Thread Jay Kreps
This sounds a lot better to me--hopefully it isn't too much harder! I do
think if it is possible to do this directly that will be better for users
than having an intermediate step since we always have to work through
migrating people who have setup quotas already from the old way to the new
way.

-Jay

On Fri, Jun 10, 2016 at 2:12 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> I do think client-id is a valid and useful grouping for quotas even in
> secure clusters - but only if clientA of user1 is treated as a different
> client-id from clientA of user2. Grouping of clients of a user enables
> users to allocate their quota effectively to their clients (eg. guarantee
> that critical event processing clients are not throttled by auditing
> clients). When the KIP was down-sized to support only user-based quotas, I
> was hoping that we could extend it at a later time to enable hierarchical
> quotas. But I understand that it can be confusing to switch the semantics
> of quotas based on modes set in the brokers. So let me try once again to
> promote the original KIP-55. At the time, I did have a flag to revert to
> the existing client-id behavior to maintain compatibility. But perhaps that
> is not necessary.
>
> How does this sound?
>
>- Quotas may be configured for users. Sub-quotas may be configured for
>client-ids of a user. Quotas may also be configured for client-ids of
> users
>with unlimited quota (Long.MaxValue).
>- Users who don't have a quota override are allocated a configurable
>default quota.
>- Client-ids without a sub-quota override share the remainder of the
>user quota if the user has a quota limit. Default quotas may be defined
> for
>clients of users with unlimited quota.
>- For an insecure or single-user secure cluster, the existing client-id
>based quota semantics can be achieved by configuring unlimited quota for
>the user and sub-quota configuration for client-id that matches the
> current
>client-id quota configuration.
>- For a cluster mixes both secure and insecure access, client-id quotas
>can be set for unauthenticated clients (unlimited quota for ANONYMOUS,
>quotas for client-ids) and user quotas can be set for authenticated
> users.
>- In a multi-user cluster, it is currently possible to define quotas for
>client-ids that span multiple users. This will no longer be supported.
>
>
>
>
> On Fri, Jun 10, 2016 at 6:43 PM, Gwen Shapira  wrote:
>
> > I am not crazy about modes either. An earlier proposal supported both
> > client-ids and users at the same time, and it made more sense to me. I
> > believe it was dropped without proper discussion (Basically, Jun
> > mentioned it is complex and Rajini agreed to drop it). We should
> > probably rethink the complexity of supporting both vs the limitations
> > of "modes".
> >
> > As you said, we will have secure clients authenticating with users and
> > insecure clients authenticating with client-ids at the same time.
> >
> > On Fri, Jun 10, 2016 at 7:19 PM, Jay Kreps  wrote:
> > > Hey Rajini,
> > >
> > > 1. That makes sense to me. Is that reflected in the documentation
> > anywhere
> > > (I couldn't really find it)? Is there a way to discover that
> definition?
> > We
> > > do way better when we right this stuff down so it has an official
> > > definition users and developers can work off of...
> > > 2. If client id is a thing that makes sense even when you have users,
> why
> > > would you not want to quota on it?
> > >
> > > I am not wild about these "modes" where you boot the cluster in mode X
> > and
> > > it behaves in one way and in mode Y and it behaves in another. It is
> > > complex then for users who expect to be able to set quotas but then
> have
> > to
> > > be able to get access to the filesystem of the kafka nodes to discover
> > what
> > > mode the cluster is in to know which kind of quota is applicable.
> > >
> > > I guess there are two ways to think about a feature: one is the
> increment
> > > from where we are, and the other is the resulting state after that
> > > increment is taken. What I am asking is not "is this a low cost step
> from
> > > where we are?" which everyone can agree that it is, but rather "does
> this
> > > make sense as an end state--i.e. if you were starting fresh with
> neither
> > > users nor client ids nor quotas would you end up with this?".
> > >
> > > In terms of use cases, I think that we support mixing secure and
> insecure
> > > access on a single cluster so presumably in that case you would want to
> > be
> > > able to quota insecure users based on client id and secure users based
> on
> > > user, right? Likewise, as you said, client id is a valid grouping even
> in
> > > the presence of users, so it might be the case that several apps that
> are
> > > all part of the same system might access Kafka under a single user, but
> > you
> > > might have different quotas for these 

[jira] [Assigned] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2016-06-10 Thread Ishita Mandhan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ishita Mandhan reassigned KAFKA-2857:
-

Assignee: Ishita Mandhan

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Ishita Mandhan
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-10 Thread Rajini Sivaram
I do think client-id is a valid and useful grouping for quotas even in
secure clusters - but only if clientA of user1 is treated as a different
client-id from clientA of user2. Grouping of clients of a user enables
users to allocate their quota effectively to their clients (eg. guarantee
that critical event processing clients are not throttled by auditing
clients). When the KIP was down-sized to support only user-based quotas, I
was hoping that we could extend it at a later time to enable hierarchical
quotas. But I understand that it can be confusing to switch the semantics
of quotas based on modes set in the brokers. So let me try once again to
promote the original KIP-55. At the time, I did have a flag to revert to
the existing client-id behavior to maintain compatibility. But perhaps that
is not necessary.

How does this sound?

   - Quotas may be configured for users. Sub-quotas may be configured for
   client-ids of a user. Quotas may also be configured for client-ids of users
   with unlimited quota (Long.MaxValue).
   - Users who don't have a quota override are allocated a configurable
   default quota.
   - Client-ids without a sub-quota override share the remainder of the
   user quota if the user has a quota limit. Default quotas may be defined for
   clients of users with unlimited quota.
   - For an insecure or single-user secure cluster, the existing client-id
   based quota semantics can be achieved by configuring unlimited quota for
   the user and sub-quota configuration for client-id that matches the current
   client-id quota configuration.
   - For a cluster mixes both secure and insecure access, client-id quotas
   can be set for unauthenticated clients (unlimited quota for ANONYMOUS,
   quotas for client-ids) and user quotas can be set for authenticated users.
   - In a multi-user cluster, it is currently possible to define quotas for
   client-ids that span multiple users. This will no longer be supported.




On Fri, Jun 10, 2016 at 6:43 PM, Gwen Shapira  wrote:

> I am not crazy about modes either. An earlier proposal supported both
> client-ids and users at the same time, and it made more sense to me. I
> believe it was dropped without proper discussion (Basically, Jun
> mentioned it is complex and Rajini agreed to drop it). We should
> probably rethink the complexity of supporting both vs the limitations
> of "modes".
>
> As you said, we will have secure clients authenticating with users and
> insecure clients authenticating with client-ids at the same time.
>
> On Fri, Jun 10, 2016 at 7:19 PM, Jay Kreps  wrote:
> > Hey Rajini,
> >
> > 1. That makes sense to me. Is that reflected in the documentation
> anywhere
> > (I couldn't really find it)? Is there a way to discover that definition?
> We
> > do way better when we right this stuff down so it has an official
> > definition users and developers can work off of...
> > 2. If client id is a thing that makes sense even when you have users, why
> > would you not want to quota on it?
> >
> > I am not wild about these "modes" where you boot the cluster in mode X
> and
> > it behaves in one way and in mode Y and it behaves in another. It is
> > complex then for users who expect to be able to set quotas but then have
> to
> > be able to get access to the filesystem of the kafka nodes to discover
> what
> > mode the cluster is in to know which kind of quota is applicable.
> >
> > I guess there are two ways to think about a feature: one is the increment
> > from where we are, and the other is the resulting state after that
> > increment is taken. What I am asking is not "is this a low cost step from
> > where we are?" which everyone can agree that it is, but rather "does this
> > make sense as an end state--i.e. if you were starting fresh with neither
> > users nor client ids nor quotas would you end up with this?".
> >
> > In terms of use cases, I think that we support mixing secure and insecure
> > access on a single cluster so presumably in that case you would want to
> be
> > able to quota insecure users based on client id and secure users based on
> > user, right? Likewise, as you said, client id is a valid grouping even in
> > the presence of users, so it might be the case that several apps that are
> > all part of the same system might access Kafka under a single user, but
> you
> > might have different quotas for these different apps. Basically if client
> > id is a valid grouping even in the presence of users (willing to debate
> > this point, btw!) then you would want to quota on it.
> >
> > -Jay
> >
> >
> >
> > On Fri, Jun 10, 2016 at 4:49 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> >> Jay,
> >>
> >> Thank you for the feedback.
> >>
> >> 1. I think it is good to have a single concept of identity, but multiple
> >> ways of grouping clients for different functions. Client-id is a logical
> >> grouping of clients with a meaningful name that is used in client
> metrics
> 

[jira] [Updated] (KAFKA-3753) Add approximateNumEntries() to the StateStore interface for metrics reporting

2016-06-10 Thread Jeff Klukas (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Klukas updated KAFKA-3753:
---
Summary: Add approximateNumEntries() to the StateStore interface for 
metrics reporting  (was: Add size() to the StateStore interface for metrics 
reporting)

> Add approximateNumEntries() to the StateStore interface for metrics reporting
> -
>
> Key: KAFKA-3753
> URL: https://issues.apache.org/jira/browse/KAFKA-3753
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: api
> Fix For: 0.10.1.0
>
>
> As a developer building a Kafka Streams application, I'd like to have 
> visibility into what's happening with my state stores. How can I know if a 
> particular store is growing large? How can I know if a particular store is 
> frequently needing to hit disk?
> I'm interested to know if there are existing mechanisms for extracting this 
> information or if other people have thoughts on how we might approach this.
> I can't think of a way to provide metrics generically, so each state store 
> implementation would likely need to handle this separately. Given that the 
> default RocksDBStore will likely be the most-used, it would be a first target 
> for adding metrics.
> I'd be interested in knowing the total number of entries in the store, the 
> total size on disk and in memory, rates of gets and puts, and hit/miss ratio 
> for the MemoryLRUCache. Some of these numbers are likely calculable through 
> the RocksDB API, others may simply not be accessible.
> Would there be value to the wider community in having state stores register 
> metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-10 Thread Jeff Klukas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325270#comment-15325270
 ] 

Jeff Klukas commented on KAFKA-3801:


I like concise code that you get from a static method reference: 
{{mapValues(LongDeserializer::deserialize)}} as opposed to {{mapValues(bytes -> 
Serdes.Long().deserializer().deserialize(bytes))}}, but you're correct that 
it's possible to use {{Serdes}} here inline.

I'm fine to close this in deference to a more comprehensive solution down the 
line.

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325258#comment-15325258
 ] 

Guozhang Wang commented on KAFKA-3775:
--

Hi [~kawamuray] Thanks for sharing your usage scenarios, it is very helpful for 
us to make user experience improvements.

In the long run, we definitely would like to make convenient memory management 
in Kafka Streams since 1) many users may start their applications in a 
container with strict memory limit, and 2) we want to control the case where 
task migration caused by, say failures, can cause cascading OOMs on other 
instances because of sudden increase of memory for new tasks; this is a similar 
scenario with your case but just in an reversed order: changing from multiple 
instances to less instances. And I agree that the static {{partition.grouper}} 
config is not best suited here. There are already some discussion in the KIP-63 
thread, which I will try to summarize in a centralized wiki.

In the near term, we can remove the continuous {{poll(0)}} just for rebalance 
once KIP-62 is adopted, which will handle the heartbeat mechanism of the 
consumer and hence streams do not need to worry about frequent polling just for 
that. After this change, the memory pressure from {{ConsumerRecord}} should be 
reduced.

Does that sound good to you?

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3817) KTableRepartitionMap should handle null inputs

2016-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325216#comment-15325216
 ] 

ASF GitHub Bot commented on KAFKA-3817:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1488


> KTableRepartitionMap should handle null inputs
> --
>
> Key: KAFKA-3817
> URL: https://issues.apache.org/jira/browse/KAFKA-3817
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
> Fix For: 0.10.0.1
>
>
> When calling {{KTable.groupBy}} on the result of a KTable-KTable join, NPEs 
> are raised:
> {{org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
> > KTableMapProcessor.process(KTableRepartitionMap.java:88)}}
> The root cause is that the join is expected to emit null values when no match 
> is found, but KTableRepartitionMap is not set up to handle this case.
> On the users email list, [~guozhang] described a plan of action:
> I think this is actually a bug in KTableRepartitionMap
> that it actually should expect null grouped keys; this would be a
> straight-forward fix for this operator, but I can make a pass over all the
> repartition operators just to make sure they are all gracefully handling
> null keys.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3817) KTableRepartitionMap should handle null inputs

2016-06-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3817.
--
Resolution: Fixed

Issue resolved by pull request 1488
[https://github.com/apache/kafka/pull/1488]

> KTableRepartitionMap should handle null inputs
> --
>
> Key: KAFKA-3817
> URL: https://issues.apache.org/jira/browse/KAFKA-3817
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
> Fix For: 0.10.0.1
>
>
> When calling {{KTable.groupBy}} on the result of a KTable-KTable join, NPEs 
> are raised:
> {{org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
> > KTableMapProcessor.process(KTableRepartitionMap.java:88)}}
> The root cause is that the join is expected to emit null values when no match 
> is found, but KTableRepartitionMap is not set up to handle this case.
> On the users email list, [~guozhang] described a plan of action:
> I think this is actually a bug in KTableRepartitionMap
> that it actually should expect null grouped keys; this would be a
> straight-forward fix for this operator, but I can make a pass over all the
> repartition operators just to make sure they are all gracefully handling
> null keys.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1488: KAFKA-3817: handle null keys in KTableRepartitionM...

2016-06-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1488


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325209#comment-15325209
 ] 

Guozhang Wang commented on KAFKA-3801:
--

I think a general question here, that we are considering, is whether we could 
make it more flexible for users to apply some stateless processing operators on 
the raw key-value pair in byte arrays before deserializing (today we ALWAYS 
deserialize first, partially because we need to extract the timestamp), so that 
for example in your case you can {{branch}} the stream to two streams for 
checkpoint / data only. But this may need to be done as a more comprehensive 
story.

As for your case specifically, for now you can still just call 
{{Serdes.Long().serializer.serialize()}} in mapValues.

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2016-06-10 Thread Ishita Mandhan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325152#comment-15325152
 ] 

Ishita Mandhan commented on KAFKA-2857:
---

Working on this with [~vahid] and we aren't sure about what the part about all 
replicas for a offset topic partition being down means. If all 
__consumer_offsets partitions are down, wouldn't all the brokers be down as 
well (meaning that all brokers keep the same copy of __consumer_offsets)?

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325148#comment-15325148
 ] 

ASF GitHub Bot commented on KAFKA-3769:
---

GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1490

KAFKA-3769: Optimize metrics recording overhead



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K3769-optimize-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1490


commit 68c889e1cba59280e4c1a37007efcdec6c784878
Author: Guozhang Wang 
Date:   2016-06-10T17:50:29Z

reduce time.milliseconds

commit ad5403c919fe6ee9e5c5e7e3d9c9f2534a5a3717
Author: Guozhang Wang 
Date:   2016-06-10T19:38:05Z

use milliseconds instead of nanoseconds for state store metrics




> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1490: KAFKA-3769: Optimize metrics recording overhead

2016-06-10 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1490

KAFKA-3769: Optimize metrics recording overhead



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K3769-optimize-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1490


commit 68c889e1cba59280e4c1a37007efcdec6c784878
Author: Guozhang Wang 
Date:   2016-06-10T17:50:29Z

reduce time.milliseconds

commit ad5403c919fe6ee9e5c5e7e3d9c9f2534a5a3717
Author: Guozhang Wang 
Date:   2016-06-10T19:38:05Z

use milliseconds instead of nanoseconds for state store metrics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-3823) QuickStart documentation is still showing MirrorMakers supports more than one consumer.config

2016-06-10 Thread Chak Lee (JIRA)
Chak Lee created KAFKA-3823:
---

 Summary: QuickStart documentation is still showing MirrorMakers 
supports more than one consumer.config
 Key: KAFKA-3823
 URL: https://issues.apache.org/jira/browse/KAFKA-3823
 Project: Kafka
  Issue Type: Bug
  Components: website
Affects Versions: 0.9.0.0
Reporter: Chak Lee
Priority: Minor


On the official QuickStart documentation, the MirrorMaker section is still 
showing the following example:

{code}
bin/kafka-mirror-maker.sh
   --consumer.config consumer-1.properties --consumer.config 
consumer-2.properties
   --producer.config producer.properties --whitelist my-topic
{code}

However, the support for this is already dropped in KAFKA-1650.  If you  tried 
to run the above script, you will get the following error:

{code}
[2016-06-10 18:35:11,201] ERROR Exception when starting mirror maker. 
(kafka.tools.MirrorMaker$)
joptsimple.MultipleArgumentsForOptionException: Found multiple arguments for 
option consumer.config, but you asked for only one
{code}

Please update the website's QuickStart section for MirrorMakers.  Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-06-10 Thread Jason Gustafson
Hey Becket,

My suggestion was pretty far from a completely thought-out proposal, but
the advantages of having your MM cluster maintain subscriptions/assignments
in its own topic are the following:

1. It solves the immediate problem of the size of the group metadata.
2. It distributes the subscription/assignment data among the brokers, so
the coordinator is not a bottleneck.
3. Subscriptions are fixed at startup, so maybe you only need to write them
once, which saves some network overhead on each rebalance. The leader is
currently sticky as well, so perhaps you could cache them and save lookup
overhead, which probably doesn't have much of an impact on memory since
subscriptions are typically the same for all consumers (except during
rolling update).

The disadvantage is that it's clearly more complex than letting the
coordinator do all the work. But it seems at this point like the other
solutions are more like workarounds.

-Jason




On Thu, Jun 9, 2016 at 10:52 PM, Becket Qin  wrote:

> Hi Jason,
>
> I am trying to understand the gain of saving the assignment and metadata in
> a topic and return the offsets to the consumers. This obviously saves
> memory footprint as we agreed before. But does it save network bandwidth?
> The consumers still need to read the same amount of data from the
> coordinator broker, right?
>
> In terms of the current proposal, Onur and I discussed offline. It looks
> that having a separate "offset.replica.fetch.max.bytes" does not buy us
> much. Users are likely setting it to be the same value as the max message
> size of the __consumer_offsets topic. The reason is this configuration is
> mainly for consumer side memory management, the users would not want to set
> it to be bigger than necessary.
>
> The solution we discussed is what Jun suggested, i.e. look at the size of
> the first message returned. If the size of the message is greater than the
> fetch max bytes, we always return the first message even it is bigger than
> max fetch size. Otherwise we only return up to fetch max bytes. We only do
> this for __consumer_offsets topic so no user topic will be impacted.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Jun 9, 2016 at 2:40 PM, Jason Gustafson 
> wrote:
>
> > Hi Onur,
> >
> > I didn't have a specific proposal in mind, I was just thinking
> analogously
> > with how Connect ensures task configurations are propagated to tasks
> > consistently when it rebalances the cluster. The high level concept is to
> > take the assignment data out of the rebalance protocol itself and replace
> > it with a pointer. For example, the pointer can be a (topic, partition,
> > offset) tuple which can be fetched separately by the consumer instance
> > after the rebalance completes. Then the leader would generate the
> > assignments, write them to Kafka, and send the pointers to the rest of
> the
> > group in the SyncGroup. This is more or less how Connect works.
> >
> > -Jason
> >
> > On Thu, Jun 9, 2016 at 11:10 AM, Onur Karaman
> >  > > wrote:
> >
> > > I think the value of adding a "offsets.replica.fetch.max.bytes" config
> is
> > > that we don't break/change the meaning of "replica.fetch.max.bytes".
> > >
> > > We can also set "offsets.replica.fetch.max.bytes" to be a value safely
> > > larger than what we expect to ever allow the __consumer_offsets topic
> max
> > > message size to be without doing the larger change of bumping up the
> > global
> > > "replica.fetch.max.bytes".
> > >
> > > On Thu, Jun 9, 2016 at 10:40 AM, Becket Qin 
> > wrote:
> > >
> > > > I think taking bigger one of the fetch size and message size limit is
> > > > probably good enough. If we have a separate
> > > > "offset.replica.fetch.max.bytes", I guess the value will always be
> set
> > to
> > > > max message size of the __consumer_offsets topic, which does not seem
> > to
> > > > have much value.
> > > >
> > > > On Thu, Jun 9, 2016 at 3:15 AM, Onur Karaman
> > >  > > > >
> > > > wrote:
> > > >
> > > > > Maybe another approach can be to add a new
> > > > > "offsets.replica.fetch.max.bytes" config on the brokers.
> > > > >
> > > > > On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman <
> okara...@linkedin.com>
> > > > > wrote:
> > > > >
> > > > > > I made a PR with a tweak to Jun's/Becket's proposal:
> > > > > > https://github.com/apache/kafka/pull/1484
> > > > > >
> > > > > > It just tweaks the fetch behavior specifically for replicas
> > fetching
> > > > from
> > > > > > the __consumer_offsets topic when the fetcher's
> > > > "replica.fetch.max.bytes"
> > > > > > is less than the __consumer_offset leader's "message.max.bytes"
> to
> > > take
> > > > > the
> > > > > > max of the two.
> > > > > >
> > > > > > I'm honestly not that happy with this solution, as I'd rather not
> > > > change
> > > > > > the "replica.fetch.max.bytes" config from being a limit to a
> > > > > > recommendation. I'd 

Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-10 Thread Gwen Shapira
I am not crazy about modes either. An earlier proposal supported both
client-ids and users at the same time, and it made more sense to me. I
believe it was dropped without proper discussion (Basically, Jun
mentioned it is complex and Rajini agreed to drop it). We should
probably rethink the complexity of supporting both vs the limitations
of "modes".

As you said, we will have secure clients authenticating with users and
insecure clients authenticating with client-ids at the same time.

On Fri, Jun 10, 2016 at 7:19 PM, Jay Kreps  wrote:
> Hey Rajini,
>
> 1. That makes sense to me. Is that reflected in the documentation anywhere
> (I couldn't really find it)? Is there a way to discover that definition? We
> do way better when we right this stuff down so it has an official
> definition users and developers can work off of...
> 2. If client id is a thing that makes sense even when you have users, why
> would you not want to quota on it?
>
> I am not wild about these "modes" where you boot the cluster in mode X and
> it behaves in one way and in mode Y and it behaves in another. It is
> complex then for users who expect to be able to set quotas but then have to
> be able to get access to the filesystem of the kafka nodes to discover what
> mode the cluster is in to know which kind of quota is applicable.
>
> I guess there are two ways to think about a feature: one is the increment
> from where we are, and the other is the resulting state after that
> increment is taken. What I am asking is not "is this a low cost step from
> where we are?" which everyone can agree that it is, but rather "does this
> make sense as an end state--i.e. if you were starting fresh with neither
> users nor client ids nor quotas would you end up with this?".
>
> In terms of use cases, I think that we support mixing secure and insecure
> access on a single cluster so presumably in that case you would want to be
> able to quota insecure users based on client id and secure users based on
> user, right? Likewise, as you said, client id is a valid grouping even in
> the presence of users, so it might be the case that several apps that are
> all part of the same system might access Kafka under a single user, but you
> might have different quotas for these different apps. Basically if client
> id is a valid grouping even in the presence of users (willing to debate
> this point, btw!) then you would want to quota on it.
>
> -Jay
>
>
>
> On Fri, Jun 10, 2016 at 4:49 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
>> Jay,
>>
>> Thank you for the feedback.
>>
>> 1. I think it is good to have a single concept of identity, but multiple
>> ways of grouping clients for different functions. Client-id is a logical
>> grouping of clients with a meaningful name that is used in client metrics
>> and logs. User principal is an authenticated user or a grouping of
>> unauthenticated users chosen by the broker and is used for ACLs. In my
>> view, in a multi-user system, there is a hierarchy - user owns zero or more
>> clients. (principal, client-id) defines a safe group, but the shorter
>> unsafe client-id is sufficient in client metrics and logs.
>>
>>
>> 2. KIP-55 was initially written to support hierarchical quotas (quotas for
>> user1-clientA, user2-clientA etc), but Jun's view was that the complexity
>> was not justified since there is no clear requirement for this. The
>> cut-down version of the KIP is clearly a lot simpler. But I think your
>> suggestion is to enable non-hierarchical user quotas and client-id quotas
>> at the same time. Basically treat users and client-ids as distinct entities
>> like topics and allow quotas to be applied to each of these entities. I
>> agree that we want to support quotas simultaneously on different entities
>> like topics and users. I am less convinced of client-id and user being
>>
>> distinct entities that require separate quotas at the same time. And
>> treating client-id and user as distinct entities makes it harder to
>> implement hierarchical quotas in future.
>>
>>
>> A single user system needs only client-id quotas, and multi-tenant system
>> cannot use client-id quotas since we need to guarantee that one tenant's
>> quotas can never be used by another tenant. I suppose a multi-user cluster
>> where users trust each other could apply separate quotas for both clients
>> and users, but I am not sure if there is a usecase that can't be satisfied
>> with just client-id based quotas for this case. Do you have a usecase in
>> mind where you want to apply separate quotas for clients and users
>> simultaneously?
>>
>>
>>
>>
>> On Thu, Jun 9, 2016 at 9:40 PM, Jay Kreps  wrote:
>>
>> > Super sorry to come in late on this one. Rajini, I had two quick
>> questions
>> > I think we should be able to answer:
>> >
>> >1. Do client ids make sense in a world which has users? If not should
>> we
>> >unify them the way Hadoop did (without auth the user is a kind of 

Re: [DISCUSS] KIP-4 Create Topic Schema

2016-06-10 Thread Gwen Shapira
Thank you for the clear proposal, Grant!

I like the request/response objects and the timeout semantics. Two comments:

1. The replica assignment protocol takes [replicas], there is the
implicit assumption that the first replica is the leader. This matches
current behavior elsewhere, but lets document it explicitly.

2. I like the timeout, but want to clarify why, since it may not be
obvious to everyone:
Currently, the response is sent when the controller has sent the
"update metadata" request to the brokers involved with the new topic.
It is a rather weak guarantee, but if clients decide to poll the
brokers for updates, it does reduce the time spent polling.
More important, this behavior is net improvement on current state
(completely async and ZK dependent) and when we do have a way to get
"ack" from replicas, we will be able to add the new behavior without
changing the protocol (just the semantics of waiting).

Gwen

On Fri, Jun 10, 2016 at 7:21 PM, Grant Henke  wrote:
> Now that Kafka 0.10 has been released I would like to start work on the new
> protocol messages and client implementation for KIP-4. In order to break up
> the discussion and feedback I would like to continue breaking up the
> content in to smaller pieces.
>
> This discussion thread is for the CreateTopic request/response and server
> side implementation. Details for this implementation can be read here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-CreateTopicRequest
>
> I have included the exact content below for clarity:
>
>> Create Topic Request (KAFKA-2945
>> )
>>
>>
>> CreateTopic Request (Version: 0) => [create_topic_requests] timeout
>>   create_topic_requests => topic partitions replication_factor 
>> [replica_assignment] [configs]
>> topic => STRING
>> partitions => INT32
>> replication_factor => INT32
>> replica_assignment => partition_id [replicas]
>>   partition_id => INT32
>>   replicas => INT32
>> configs => config_key config_value
>>   config_key => STRING
>>   config_value => STRING
>>   timeout => INT32
>>
>> CreateTopicRequest is a batch request to initiate topic creation with
>> either predefined or automatic replica assignment and optionally topic
>> configuration.
>>
>> Request semantics:
>>
>>1. Must be sent to the controller broker
>>2. Multiple instructions for the same topic in one request will be
>>silently ignored, only the last from the list will be executed.
>>   - This is because the list of topics is modeled server side as a
>>   map with TopicName as the key
>>3. The principle must be authorized to the "Create" Operation on the
>>"Cluster" resource to create topics.
>>   - Unauthorized requests will receive a ClusterAuthorizationException
>>4.
>>
>>Only one from ReplicaAssignment or (Partitions + ReplicationFactor), can
>>be defined in one instruction. If both parameters are specified -
>>ReplicaAssignment takes precedence.
>>- In the case ReplicaAssignment is defined number of partitions and
>>   replicas will be calculated from the supplied ReplicaAssignment.
>>   - In the case of defined (Partitions + ReplicationFactor) replica
>>   assignment will be automatically generated by the server.
>>   - One or the other must be defined. The existing broker side auto
>>   create defaults will not be used
>>   (default.replication.factor, num.partitions). The client 
>> implementation can
>>   have defaults for these options when generating the messages.
>>5. Setting a timeout > 0 will allow the request to block until the
>>topic metadata is "complete" on the controller node.
>>   - Complete means the topic metadata has been completely populated
>>   (leaders, replicas, ISRs)
>>   - If a timeout error occurs, the topic could still be created
>>   successfully at a later time. Its up to the client to query for the 
>> state
>>   at that point.
>>6. The request is not transactional.
>>   1. If an error occurs on one topic, the other could still be
>>   created.
>>   2. Errors are reported independently.
>>
>> QA:
>>
>>- Why is CreateTopicRequest a batch request?
>>   - Scenarios where tools or admins want to create many topics should
>>   be able to with fewer requests
>>   - Example: MirrorMaker may want to create the topics downstream
>>- What happens if some topics error immediately? Will it
>>return immediately?
>>   - The request will block until all topics have either been created,
>>   errors, or the timeout has been hit
>>   - There is no "short circuiting" where 1 error stops the other
>>   topics from being created
>>   - Why implement "partial blocking" instead of fully async of fully
>>consistent?
>>

[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Attachment: screenshot-latency.png

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, 
> screenshot-latency.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324869#comment-15324869
 ] 

Greg Fodor commented on KAFKA-3811:
---

Also, I've attached a screenshot + snapshot of a second run where I started 
sending data deeper in the pipeline which started to cause the latency metrics 
to take up a few % of time since we're using state stores. To me I guess a lot 
of this looks like lock contention mostly.



> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, 
> screenshot-latency.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Attachment: Muon-latency.zip

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324847#comment-15324847
 ] 

Greg Fodor commented on KAFKA-3811:
---

I've also attached a screenshot of YourKit of the relevant call stacks

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, screenshot-1.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Attachment: screenshot-1.png

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, screenshot-1.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Attachment: Muon-Snapshot.zip

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3822) Kafka Consumer close() hangs indefinitely if Kafka Broker shutdown while connected

2016-06-10 Thread Alexander Cook (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Cook updated KAFKA-3822:
--
Description: 
I am using the KafkaConsumer java client to consume messages. My application 
shuts down smoothly if I am connected to a Kafka broker, or if I never succeed 
at connecting to a Kafka broker, but if the broker is shut down while my 
consumer is connected to it, consumer.close() hangs indefinitely. 

Here is how I reproduce it: 

1. Start 0.9.0.1 Kafka Broker
2. Start consumer application and consume messages
3. Stop 0.9.0.1 Kafka Broker (ctrl-c or stop script)
4. Try to stop application...hangs at consumer.close() indefinitely. 

I also see this same behavior using 0.10 broker and client. 

This is my first bug reported to Kafka, so please let me know if I should be 
following a different format. Thanks! 

  was:
I am using the KafkaConsumer java client to consume messages. My application 
shuts down smoothly if I am connected to a Kafka broker, or if I never succeed 
at connecting to a Kafka broker, but if the broker is shut down while my 
consumer is connected to it, consumer.close() hangs indefinitely. 

Here is how I reproduce it: 

1. Start 0.9.0.1 Kafka Broker
2. Start consumer application and consume messages
3. Stop 0.9.0.1 Kafka Broker (ctrl-c or stop script)
4. Try to stop application...hangs at consumer.close() indefinitely. 

I am going to try this out on 0.10 to see if the same thing happens. 

This is my first bug reported to Kafka, so please let me know if I should be 
following a different format. Thanks! 


> Kafka Consumer close() hangs indefinitely if Kafka Broker shutdown while 
> connected
> --
>
> Key: KAFKA-3822
> URL: https://issues.apache.org/jira/browse/KAFKA-3822
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
> Environment: x86 Red Hat 6 (1 broker running zookeeper locally, 
> client running on a separate server)
>Reporter: Alexander Cook
>
> I am using the KafkaConsumer java client to consume messages. My application 
> shuts down smoothly if I am connected to a Kafka broker, or if I never 
> succeed at connecting to a Kafka broker, but if the broker is shut down while 
> my consumer is connected to it, consumer.close() hangs indefinitely. 
> Here is how I reproduce it: 
> 1. Start 0.9.0.1 Kafka Broker
> 2. Start consumer application and consume messages
> 3. Stop 0.9.0.1 Kafka Broker (ctrl-c or stop script)
> 4. Try to stop application...hangs at consumer.close() indefinitely. 
> I also see this same behavior using 0.10 broker and client. 
> This is my first bug reported to Kafka, so please let me know if I should be 
> following a different format. Thanks! 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324838#comment-15324838
 ] 

Greg Fodor commented on KAFKA-3811:
---

Hey [~aartigupta], I ran an attached yourkit profiler to one of our jobs 
running dark against production data. The job has 200-300 topic-partition pairs 
and generally discards most messages early in the pipeline, and was processing 
a few thousand tps from the top level topics. Unfortunately since this issue 
came up we implemented changes to reduce the amount of data running through the 
system (discarding it earlier) so we didn't have to worry about this 
performance problem. In my tests a majority of the CPU time of the job was 
spent inside of the code walking and emitting to the Sensors for the 
per-message process metrics and the per-k/v read/write latency metrics. I also 
found 6-7% of the time was spent in the fetcher metrics which was addressed 
here: https://github.com/apache/kafka/pull/1464. 

Good news: I managed to find the snapshot data :) I will attach it here. The 
majority of the time is *not* the milliseconds() call but the actual 
(synchronized?) walk of Sensors in Sensor.record.

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-06-10 Thread Gwen Shapira
>From what I can see, remaining questions are:

1. Who / how are tokens renewed? By original requester only? or using
Kerberos auth only?
2. Are tokens stored on each broker or in ZK?
3. How are tokens invalidated / expired?
4. Which encryption algorithm is used?
5. What is the impersonation proposal (it wasn't in the KIP but was
discussed in this thread)?
6. Do we need new ACLs, if so - for what actions?

Gwen

On Thu, Jun 9, 2016 at 7:48 PM, Harsha  wrote:
> Jun & Ismael,
>  Unfortunately I couldn't attend the KIP meeting
>  when delegation tokens discussed. Appreciate if
>  you can update the thread if you have any
>  further questions.
> Thanks,
> Harsha
>
> On Tue, May 24, 2016, at 11:32 AM, Liquan Pei wrote:
>> It seems that the links to images in the KIP are broken.
>>
>> Liquan
>>
>> On Tue, May 24, 2016 at 9:33 AM, parth brahmbhatt <
>> brahmbhatt.pa...@gmail.com> wrote:
>>
>> > 110. What does getDelegationTokenAs mean?
>> > In the current proposal we only allow a user to get delegation token for
>> > the identity that it authenticated as using another mechanism, i.e. A user
>> > that authenticate using a keytab for principal us...@example.com will get
>> > delegation tokens for that user only. In future I think we will have to
>> > extend support such that we allow some set of users (
>> > kafka-rest-u...@example.com, storm-nim...@example.com) to acquire
>> > delegation tokens on behalf of other users whose identity they have
>> > verified independently.  Kafka brokers will have ACLs to control which
>> > users are allowed to impersonate other users and get tokens on behalf of
>> > them. Overall Impersonation is a whole different problem in my opinion and
>> > I think we can tackle it in separate KIP.
>> >
>> > 111. What's the typical rate of getting and renewing delegation tokens?
>> > Typically this should be very very low, 1 request per minute is a
>> > relatively high estimate. However it depends on the token expiration. I am
>> > less worried about the extra load it puts on controller vs the added
>> > complexity and the value it offers.
>> >
>> > Thanks
>> > Parth
>> >
>> >
>> >
>> > On Tue, May 24, 2016 at 7:30 AM, Ismael Juma  wrote:
>> >
>> > > Thanks Rajini. It would probably require a separate KIP as it will
>> > > introduce user visible changes. We could also update KIP-48 to have this
>> > > information, but it seems cleaner to do it separately. We can discuss
>> > that
>> > > in the KIP call today.
>> > >
>> > > Ismael
>> > >
>> > > On Tue, May 24, 2016 at 3:19 PM, Rajini Sivaram <
>> > > rajinisiva...@googlemail.com> wrote:
>> > >
>> > > > Ismael,
>> > > >
>> > > > I have created a JIRA (
>> > https://issues.apache.org/jira/browse/KAFKA-3751)
>> > > > for adding SCRAM as a SASL mechanism. Would that need another KIP? If
>> > > > KIP-48 will use this mechanism, can this just be a JIRA that gets
>> > > reviewed
>> > > > when the PR is ready?
>> > > >
>> > > > Thank you,
>> > > >
>> > > > Rajini
>> > > >
>> > > > On Tue, May 24, 2016 at 2:46 PM, Ismael Juma 
>> > wrote:
>> > > >
>> > > > > Thanks Rajini, SCRAM seems like a good candidate.
>> > > > >
>> > > > > Gwen had independently mentioned this as a SASL mechanism that might
>> > be
>> > > > > useful for Kafka and I have been meaning to check it in more detail.
>> > > Good
>> > > > > to know that you are willing to contribute an implementation. Maybe
>> > we
>> > > > > should file a separate JIRA for this?
>> > > > >
>> > > > > Ismael
>> > > > >
>> > > > > On Tue, May 24, 2016 at 2:12 PM, Rajini Sivaram <
>> > > > > rajinisiva...@googlemail.com> wrote:
>> > > > >
>> > > > > > SCRAM (Salted Challenge Response Authentication Mechanism) is a
>> > > better
>> > > > > > mechanism than Digest-MD5. Java doesn't come with a built-in SCRAM
>> > > > > > SaslServer or SaslClient, but I will be happy to add support in
>> > Kafka
>> > > > > since
>> > > > > > it would be a useful mechanism to support anyway.
>> > > > > > https://tools.ietf.org/html/rfc7677 describes the protocol for
>> > > > > > SCRAM-SHA-256.
>> > > > > >
>> > > > > > On Tue, May 24, 2016 at 2:37 AM, Jun Rao  wrote:
>> > > > > >
>> > > > > > > Parth,
>> > > > > > >
>> > > > > > > Thanks for the explanation. A couple of more questions.
>> > > > > > >
>> > > > > > > 110. What does getDelegationTokenAs mean?
>> > > > > > >
>> > > > > > > 111. What's the typical rate of getting and renewing delegation
>> > > > tokens?
>> > > > > > > That may have an impact on whether they should be directed to the
>> > > > > > > controller.
>> > > > > > >
>> > > > > > > Jun
>> > > > > > >
>> > > > > > > On Mon, May 23, 2016 at 1:19 PM, parth brahmbhatt <
>> > > > > > > brahmbhatt.pa...@gmail.com> wrote:
>> > > > > > >
>> > > > > > > > Hi Jun,
>> > > > > > > >
>> > > > > > > > Thanks for reviewing.
>> > > > > > > >
>> > > > > > > 

[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread aarti gupta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324760#comment-15324760
 ] 

aarti gupta commented on KAFKA-3811:


>>Make the metrics lower overhead (this is an issue in the producer too).
[~jkreps] can you share details on how these measurements were done.

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread aarti gupta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324755#comment-15324755
 ] 

aarti gupta commented on KAFKA-3811:



[~gfodor] Can you please share and outline the profiling and analysis you did 
around metrics overhead.

I just ran  the 'WordProcessorDemo' with actual data (continuously being 
published) on the input stream and profiled the streams example using both Java 
mission control Flight recorder and Yourkit profiler (evaluation version), but 
see a 5 % CPU overhead for the entire process. How are you isolating the time 
taken to stamp metrics? 

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3803) JsonConverter deserialized Struct containing bytes field does not return true for equals()

2016-06-10 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324743#comment-15324743
 ] 

Jay Kreps commented on KAFKA-3803:
--

FWIW, past experience with giving users ByteBuffer has been pretty rough. 
ByteBuffer is pretty confusing for us, but apparently very confusing to normal 
Java people who have never seen it before.

> JsonConverter deserialized Struct containing bytes field does not return true 
> for equals()
> --
>
> Key: KAFKA-3803
> URL: https://issues.apache.org/jira/browse/KAFKA-3803
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>
> The problem is that byte[] comparisons will return false for equality, so 
> even if the two are effectively equal, Struct.equals will not return true.
> It's unclear if this should be fixed or not. Equality wouldn't work for map 
> or array types containing bytes either. However, on possibility is making 
> ByteBuffer the default instead to alleviate this, although then we may end up 
> with asymmetry in equality.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] KIP-4 Create Topic Schema

2016-06-10 Thread Grant Henke
Now that Kafka 0.10 has been released I would like to start work on the new
protocol messages and client implementation for KIP-4. In order to break up
the discussion and feedback I would like to continue breaking up the
content in to smaller pieces.

This discussion thread is for the CreateTopic request/response and server
side implementation. Details for this implementation can be read here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-CreateTopicRequest

I have included the exact content below for clarity:

> Create Topic Request (KAFKA-2945
> )
>
>
> CreateTopic Request (Version: 0) => [create_topic_requests] timeout
>   create_topic_requests => topic partitions replication_factor 
> [replica_assignment] [configs]
> topic => STRING
> partitions => INT32
> replication_factor => INT32
> replica_assignment => partition_id [replicas]
>   partition_id => INT32
>   replicas => INT32
> configs => config_key config_value
>   config_key => STRING
>   config_value => STRING
>   timeout => INT32
>
> CreateTopicRequest is a batch request to initiate topic creation with
> either predefined or automatic replica assignment and optionally topic
> configuration.
>
> Request semantics:
>
>1. Must be sent to the controller broker
>2. Multiple instructions for the same topic in one request will be
>silently ignored, only the last from the list will be executed.
>   - This is because the list of topics is modeled server side as a
>   map with TopicName as the key
>3. The principle must be authorized to the "Create" Operation on the
>"Cluster" resource to create topics.
>   - Unauthorized requests will receive a ClusterAuthorizationException
>4.
>
>Only one from ReplicaAssignment or (Partitions + ReplicationFactor), can
>be defined in one instruction. If both parameters are specified -
>ReplicaAssignment takes precedence.
>- In the case ReplicaAssignment is defined number of partitions and
>   replicas will be calculated from the supplied ReplicaAssignment.
>   - In the case of defined (Partitions + ReplicationFactor) replica
>   assignment will be automatically generated by the server.
>   - One or the other must be defined. The existing broker side auto
>   create defaults will not be used
>   (default.replication.factor, num.partitions). The client implementation 
> can
>   have defaults for these options when generating the messages.
>5. Setting a timeout > 0 will allow the request to block until the
>topic metadata is "complete" on the controller node.
>   - Complete means the topic metadata has been completely populated
>   (leaders, replicas, ISRs)
>   - If a timeout error occurs, the topic could still be created
>   successfully at a later time. Its up to the client to query for the 
> state
>   at that point.
>6. The request is not transactional.
>   1. If an error occurs on one topic, the other could still be
>   created.
>   2. Errors are reported independently.
>
> QA:
>
>- Why is CreateTopicRequest a batch request?
>   - Scenarios where tools or admins want to create many topics should
>   be able to with fewer requests
>   - Example: MirrorMaker may want to create the topics downstream
>- What happens if some topics error immediately? Will it
>return immediately?
>   - The request will block until all topics have either been created,
>   errors, or the timeout has been hit
>   - There is no "short circuiting" where 1 error stops the other
>   topics from being created
>   - Why implement "partial blocking" instead of fully async of fully
>consistent?
>   - See Cluster Consistent Blocking
>   
> 
>below
>- Why require the request to go to the controller?
>   - The controller is responsible for the cluster metadata and
>   its propagation
>   - See Request Forwarding
>   
> 
>   below
>
> Create Topic Response
>
>
> CreateTopic Response (Version: 0) => [topic_error_codes]
>   topic_error_codes => topic error_code
> topic => STRING
> error_code => INT16
>
> CreateTopicResponse contains a map between topic and topic creation
> result error code (see New Protocol Errors
> 
> ).
>

A sample PR is on github (https://github.com/apache/kafka/pull/1489) though
it could change drastically based on the feedback here.


[jira] [Commented] (KAFKA-2945) CreateTopic - protocol and server side implementation

2016-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324728#comment-15324728
 ] 

ASF GitHub Bot commented on KAFKA-2945:
---

GitHub user granthenke opened a pull request:

https://github.com/apache/kafka/pull/1489

KAFKA-2945: CreateTopic - protocol and server side implementation



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka create-wire-new

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1489


commit 8411a89ef97916acb233f350493c2a318a4f8118
Author: Grant Henke 
Date:   2016-06-09T21:34:14Z

KAFKA-2945: CreateTopic - protocol and server side implementation




> CreateTopic - protocol and server side implementation
> -
>
> Key: KAFKA-2945
> URL: https://issues.apache.org/jira/browse/KAFKA-2945
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #626: KAFKA-2945: CreateTopic - protocol and server side ...

2016-06-10 Thread granthenke
Github user granthenke closed the pull request at:

https://github.com/apache/kafka/pull/626


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2945) CreateTopic - protocol and server side implementation

2016-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324731#comment-15324731
 ] 

ASF GitHub Bot commented on KAFKA-2945:
---

Github user granthenke closed the pull request at:

https://github.com/apache/kafka/pull/626


> CreateTopic - protocol and server side implementation
> -
>
> Key: KAFKA-2945
> URL: https://issues.apache.org/jira/browse/KAFKA-2945
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1489: KAFKA-2945: CreateTopic - protocol and server side...

2016-06-10 Thread granthenke
GitHub user granthenke opened a pull request:

https://github.com/apache/kafka/pull/1489

KAFKA-2945: CreateTopic - protocol and server side implementation



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka create-wire-new

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1489


commit 8411a89ef97916acb233f350493c2a318a4f8118
Author: Grant Henke 
Date:   2016-06-09T21:34:14Z

KAFKA-2945: CreateTopic - protocol and server side implementation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-10 Thread Jay Kreps
Hey Rajini,

1. That makes sense to me. Is that reflected in the documentation anywhere
(I couldn't really find it)? Is there a way to discover that definition? We
do way better when we right this stuff down so it has an official
definition users and developers can work off of...
2. If client id is a thing that makes sense even when you have users, why
would you not want to quota on it?

I am not wild about these "modes" where you boot the cluster in mode X and
it behaves in one way and in mode Y and it behaves in another. It is
complex then for users who expect to be able to set quotas but then have to
be able to get access to the filesystem of the kafka nodes to discover what
mode the cluster is in to know which kind of quota is applicable.

I guess there are two ways to think about a feature: one is the increment
from where we are, and the other is the resulting state after that
increment is taken. What I am asking is not "is this a low cost step from
where we are?" which everyone can agree that it is, but rather "does this
make sense as an end state--i.e. if you were starting fresh with neither
users nor client ids nor quotas would you end up with this?".

In terms of use cases, I think that we support mixing secure and insecure
access on a single cluster so presumably in that case you would want to be
able to quota insecure users based on client id and secure users based on
user, right? Likewise, as you said, client id is a valid grouping even in
the presence of users, so it might be the case that several apps that are
all part of the same system might access Kafka under a single user, but you
might have different quotas for these different apps. Basically if client
id is a valid grouping even in the presence of users (willing to debate
this point, btw!) then you would want to quota on it.

-Jay



On Fri, Jun 10, 2016 at 4:49 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Jay,
>
> Thank you for the feedback.
>
> 1. I think it is good to have a single concept of identity, but multiple
> ways of grouping clients for different functions. Client-id is a logical
> grouping of clients with a meaningful name that is used in client metrics
> and logs. User principal is an authenticated user or a grouping of
> unauthenticated users chosen by the broker and is used for ACLs. In my
> view, in a multi-user system, there is a hierarchy - user owns zero or more
> clients. (principal, client-id) defines a safe group, but the shorter
> unsafe client-id is sufficient in client metrics and logs.
>
>
> 2. KIP-55 was initially written to support hierarchical quotas (quotas for
> user1-clientA, user2-clientA etc), but Jun's view was that the complexity
> was not justified since there is no clear requirement for this. The
> cut-down version of the KIP is clearly a lot simpler. But I think your
> suggestion is to enable non-hierarchical user quotas and client-id quotas
> at the same time. Basically treat users and client-ids as distinct entities
> like topics and allow quotas to be applied to each of these entities. I
> agree that we want to support quotas simultaneously on different entities
> like topics and users. I am less convinced of client-id and user being
>
> distinct entities that require separate quotas at the same time. And
> treating client-id and user as distinct entities makes it harder to
> implement hierarchical quotas in future.
>
>
> A single user system needs only client-id quotas, and multi-tenant system
> cannot use client-id quotas since we need to guarantee that one tenant's
> quotas can never be used by another tenant. I suppose a multi-user cluster
> where users trust each other could apply separate quotas for both clients
> and users, but I am not sure if there is a usecase that can't be satisfied
> with just client-id based quotas for this case. Do you have a usecase in
> mind where you want to apply separate quotas for clients and users
> simultaneously?
>
>
>
>
> On Thu, Jun 9, 2016 at 9:40 PM, Jay Kreps  wrote:
>
> > Super sorry to come in late on this one. Rajini, I had two quick
> questions
> > I think we should be able to answer:
> >
> >1. Do client ids make sense in a world which has users? If not should
> we
> >unify them the way Hadoop did (without auth the user is a kind of best
> >effort honor system identity). This came up in the discussion thread
> > but I
> >didn't really see a crisp answer. Basically, what is the definition of
> >"client id" and what is the definition of "user" and how do the
> concepts
> >relate?
> >2. If both client ids and users are sensible distinct notions and we
> >want to maintain both, why don't we just support quotas on both? If
> they
> >both make sense then you would have a reason to set quotas at both
> > levels.
> >Why have this "mode" that you set that swaps between only being able
> to
> > use
> >one and the other? I should be able to set quotas at both levels.
> Going
> > 

[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324669#comment-15324669
 ] 

Jay Kreps commented on KAFKA-3811:
--

I'm not wild about introducing these levels in an ad hoc way in Kafka Streams. 
A couple of other options:

1. Make the metrics lower overhead (this is an issue in the producer too).
2. Optimize the usage of metrics in the consumer and streams (i.e. in the 
producer we increment metrics in batch to avoid locking on each message).
3. Add a general purpose feature to the metrics library and use it across the 
producer, consumer, and streams.

For (3) here is what I am thinking, I think what you are describing is a bit 
like log4j where there is DEBUG level logging that is cheap or free when you 
haven't turned it on. Basically what I'm imagining is that there would be a new 
attribute in org.apache.kafka.common.metrics.Sensor that is something like 
DEBUG/INFO and then there is a global level that is set (and perhaps can be 
changed via JMX) and the locking and update of the sensor only happens if the 
appropriate level or lower is active. Then we would categorize existing metrics 
with this category through the producer, consumer, and streams. (Arguably this 
should be at the metric level rather than the sensor level but I'm not sure if 
it's possible to make that cheap--if so that might be better).

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread aarti gupta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aarti gupta reassigned KAFKA-3811:
--

Assignee: aarti gupta  (was: Guozhang Wang)

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324624#comment-15324624
 ] 

Edoardo Comar edited comment on KAFKA-3727 at 6/10/16 3:26 PM:
---

would it make sense that the missing topic/partition exception is retriable or 
not depending on the broker's topic auto-create setting? 
the broker could return a NotRetriableMissingTPException 

a new error code is an API change so it would be a target for the 0.11 release, 
right ?


was (Author: ecomar):
would it make sense that the missing topic/partition exception is retriable or 
not depending on the broker's topic auto-create setting? 
the broker could return a NotRetriableMissingTPException 

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324624#comment-15324624
 ] 

Edoardo Comar commented on KAFKA-3727:
--

would it make sense that the missing topic/partition exception is retriable or 
not depending on the broker's topic auto-create setting? 
the broker could return a NotRetriableMissingTPException 

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324450#comment-15324450
 ] 

Edoardo Comar commented on KAFKA-3727:
--

I think it would be reasonable to have the poll timeout propagated to the 
Fetcher.listOffset loop.

Would solve this problem, though not 
https://issues.apache.org/jira/browse/KAFKA-3177 straight away.

I'll work on a PR in the next few days (very busy until Wed)

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-3727:


Assignee: Edoardo Comar

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324429#comment-15324429
 ] 

Ismael Juma commented on KAFKA-3727:


It's due to auto-topic creation as you mentioned.

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-06-10 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324424#comment-15324424
 ] 

Edoardo Comar commented on KAFKA-3727:
--

[~ijuma] [~hachikuji] ... reposting here as more on-topic than in 
https://github.com/apache/kafka/pull/1428 

Why should a missing topic or partition be a retriable exception ?
I am thinking at least of the case where auto-create is turned off.

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>Priority: Critical
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-10 Thread Rajini Sivaram
Jay,

Thank you for the feedback.

1. I think it is good to have a single concept of identity, but multiple
ways of grouping clients for different functions. Client-id is a logical
grouping of clients with a meaningful name that is used in client metrics
and logs. User principal is an authenticated user or a grouping of
unauthenticated users chosen by the broker and is used for ACLs. In my
view, in a multi-user system, there is a hierarchy - user owns zero or more
clients. (principal, client-id) defines a safe group, but the shorter
unsafe client-id is sufficient in client metrics and logs.


2. KIP-55 was initially written to support hierarchical quotas (quotas for
user1-clientA, user2-clientA etc), but Jun's view was that the complexity
was not justified since there is no clear requirement for this. The
cut-down version of the KIP is clearly a lot simpler. But I think your
suggestion is to enable non-hierarchical user quotas and client-id quotas
at the same time. Basically treat users and client-ids as distinct entities
like topics and allow quotas to be applied to each of these entities. I
agree that we want to support quotas simultaneously on different entities
like topics and users. I am less convinced of client-id and user being

distinct entities that require separate quotas at the same time. And
treating client-id and user as distinct entities makes it harder to
implement hierarchical quotas in future.


A single user system needs only client-id quotas, and multi-tenant system
cannot use client-id quotas since we need to guarantee that one tenant's
quotas can never be used by another tenant. I suppose a multi-user cluster
where users trust each other could apply separate quotas for both clients
and users, but I am not sure if there is a usecase that can't be satisfied
with just client-id based quotas for this case. Do you have a usecase in
mind where you want to apply separate quotas for clients and users
simultaneously?




On Thu, Jun 9, 2016 at 9:40 PM, Jay Kreps  wrote:

> Super sorry to come in late on this one. Rajini, I had two quick questions
> I think we should be able to answer:
>
>1. Do client ids make sense in a world which has users? If not should we
>unify them the way Hadoop did (without auth the user is a kind of best
>effort honor system identity). This came up in the discussion thread
> but I
>didn't really see a crisp answer. Basically, what is the definition of
>"client id" and what is the definition of "user" and how do the concepts
>relate?
>2. If both client ids and users are sensible distinct notions and we
>want to maintain both, why don't we just support quotas on both? If they
>both make sense then you would have a reason to set quotas at both
> levels.
>Why have this "mode" that you set that swaps between only being able to
> use
>one and the other? I should be able to set quotas at both levels. Going
>forward the model we had discussed with quotas was potentially being
> able
>to set quotas for many things independently (say at the topic level),
> and I
>don't think it would make sense to extend this mode approach to those.
>
> -Jay
>
> On Wed, Jun 8, 2016 at 12:56 PM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > I would like to initiate the vote for KIP-55.
> >
> > The KIP details are here: KIP-55: Secure quotas for authenticated users
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-55%3A+Secure+Quotas+for+Authenticated+Users
> > >
> > .
> >
> > The JIRA  KAFKA-3492   > >has
> > a draft PR here: https://github.com/apache/kafka/pull/1256.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>



-- 
Regards,

Rajini


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-10 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324187#comment-15324187
 ] 

Yuto Kawamura commented on KAFKA-3775:
--

Sorry for leaving this discussion for a while and thanks for all your comments. 
I was busy for other work before half of this week and just yesterday I 
finished deploying my Kafka Streams app on production. I've got some experience 
through that so let me summarize it here.

So the initial configuration at the first moment was look like this:
{code}
poll.ms=100(default)
buffered.records.per.partition=1000(default)
max.poll.records=Integer.MAX_VALUE(default)
max.partition.fetch.bytes=1MiB(default)
num.stream.threads=1
{code}

First I tried to take a heapdump of the Kafka Streams process just before it 
dies. Then I found that there's 918772 instances of {{ConsumerRecord}}(most of 
them should be already a garbage as I took the heapdump with -F switch) which 
consumes more than 500MB of heap with it's referenced byte array at the moment. 
There was no other significant usage of heap by other objects(which are 
irrelevant to ConsumerRecord) so I'm sure this was causing OOM of my app.

So I tried several configuration adjustment to avoid OOM. Here's the list I've 
tried:

- Decrease {{buffered.records.per.partition}} from 1000(default) to 10 => No 
luck. Still OOM.
- Decrease {{max.partition.fetch.bytes}} => Couldn't as we allow 1MiB size of 
message at maximum.
- Decrease {{max.poll.records}} from Integer.MAX_VALUE(default) to 10 => 
Worked. No more OOM.

Therefore by decreasing {{max.poll.records}} my application stop dying by OOM. 
Before that on each poll() invocation it might returned all records fetched for 
each partition so the memory could be exhausted very easy(I was 
misunderstanding about this point; I was thinking that poll() is never called 
as long as all tasks keep records more than {{buffered.records.per.partition}} 
but it was called continually in fact regardless to that because of {{poll.ms}} 
expiration).
Network traffic increased about 300Mbps on that host but still not problematic 
ATM as the throughput was likely throttled by the single 
thread({{num.stream.threads=1}}).

After the all instances are up I confirmed that the total throughput isn't 
enough as I saw the consumption lag keep increasing. I increased the 
{{num.stream.threads}} up to 3 and did the same deployment again(I know that I 
could perform rolling restart but just wanted to see what will happen with 
increased number of threads).
So again, first instance survived without OOM but this time the traffic on the 
NIC increased about 600Mbps which was almost critical level on our network 
spec. As I started rest of instances, all partitions are distributed equally 
and now they are running pretty well.

So my conclusion is:

- Decreasing {{max.poll.records}} to the proper value works in terms of OOM. 
Still it's not intuitive that it controls memory pressure as the heap usage 
throttling is just a side effect of this adjustment(it's not for this purpose 
but for adjusting interval to call {{consumer.poll()}} within proper moment to 
avoid assignment expiration IIUC).
- Still couldn't throttle the network traffic. As I wrote above, when I started 
a single instance with giving {{num.stream.threads=3}}, the traffic on a NIC of 
that host reached it's maximum capacity(1Gbps) while it's on catch up read. 
This could be serious in terms of packet dropping as we're deploying other 
service daemons on the same node.
- I'm still not certain what is the best way of doing it but I believe it's 
worthful if we have an option to throttle the maximum number of incoming 
messages to a single instance(or in other word, the maximum capability of 
single KafkaStreams instance) regarding both memory pressure and traffic. So 
I'm basically +1 on idea that [~jkreps] suggested(global memory allocation 
throttling) but still wondering what you can suggest me an option for 
throttling the network traffic.

And about PartitionGrouper:
So it can be used to reduce the number of target tasks but that can't be 
changed w/o rewriting configuration(to revert partition.grouper) and restarting 
an instance right?
If so, that's too cumbersome to perform such a 2-step deployment. First I have 
to deploy a single instance of custom {{partition.grouper}}, then deploy rest 
of instances, and finally revert the configuration and deploy again the first 
instance? No way :(


> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> 

[jira] [Commented] (KAFKA-3594) Kafka new producer retries doesn't work in 0.9.0.1

2016-06-10 Thread Petr Novotnik (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324104#comment-15324104
 ] 

Petr Novotnik commented on KAFKA-3594:
--

Any chance for this to be back ported to a 0.9.0.x release?

> Kafka new producer retries doesn't work in 0.9.0.1
> --
>
> Key: KAFKA-3594
> URL: https://issues.apache.org/jira/browse/KAFKA-3594
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
> Environment: Debian 7.8 Wheezy / Kafka 0.9.0.1 (Confluent 2.0.1)
>Reporter: Nicolas PHUNG
>Assignee: Manikumar Reddy
>Priority: Critical
>  Labels: kafka, new, producer, replication, retry
> Fix For: 0.10.0.0
>
>
> Hello,
> I'm encountering an issue with the new Producer on 0.9.0.1 client with a 
> 0.9.0.1 Kafka broker when Kafka broker are offline for example. It seems the 
> retries doesn't work anymore and I got the following error logs :
> {noformat}
> play.api.Application$$anon$1: Execution exception[[IllegalStateException: 
> Memory records is not writable]]
> at play.api.Application$class.handleError(Application.scala:296) 
> ~[com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
> at play.api.DefaultApplication.handleError(Application.scala:402) 
> [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
> at 
> play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320)
>  [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
> at 
> play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320)
>  [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
> at scala.Option.map(Option.scala:146) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at 
> play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3.applyOrElse(PlayDefaultUpstreamHandler.scala:320)
>  [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
> at 
> play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3.applyOrElse(PlayDefaultUpstreamHandler.scala:316)
>  [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
> at 
> scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:346) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at 
> scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:345) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at 
> play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:46) 
> [com.typesafe.play.play-iteratees_2.11-2.3.10.jar:2.3.10]
> at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at scala.concurrent.Promise$class.complete(Promise.scala:55) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at 
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
> at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
> at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
> at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
> at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
> at 
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) 
> [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) 
> [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>  

[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes

2016-06-10 Thread Michal Turek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324020#comment-15324020
 ] 

Michal Turek commented on KAFKA-3806:
-

Hi, thanks for the advises. We will workaround the issue by prolonging 
offsets.retention.minutes, it seems safe in our environment.

> Adjust default values of log.retention.hours and offsets.retention.minutes
> --
>
> Key: KAFKA-3806
> URL: https://issues.apache.org/jira/browse/KAFKA-3806
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Michal Turek
>Priority: Minor
>
> Combination of default values of log.retention.hours (168 hours = 7 days) and 
> offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special 
> cases. Offset retention should be always greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, 
> topic wasn't deleted.
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no 
> more incoming data and there was nothing to confirm. (We have auto-commit 
> disabled, I'm not sure how behaves enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to 
> offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it 
> didn't find any committed offsets for that topic since they were deleted by 
> offsets.retention.minutes so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 
> days of messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, 
> don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice 
> larger than log.retention.hours.
> - Check these values during Kafka startup and log a warning if 
> offsets.retention.minutes is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets 
> in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3809) Auto-generate documentation for topic-level configuration

2016-06-10 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323967#comment-15323967
 ] 

James Cheng commented on KAFKA-3809:


[~jjkoshy], would you be open to splitting 
https://issues.apache.org/jira/browse/KAFKA-3234 into the following pieces? 
1) auto-generation of the documentation for topic-level configs
2) design documentation updates
3) the renaming and deprecation of the existing topic-level configurations

The 3rd one would presumably require a KIP and discussion, whereas 1) and 2) 
are pretty straight-foward. What do you think? 

If you are okay with the approach, would you be okay if I took over the work of 
auto-generating the topic-level docs? I can base it on the pull request that 
you already did for https://issues.apache.org/jira/browse/KAFKA-3234.


> Auto-generate documentation for topic-level configuration
> -
>
> Key: KAFKA-3809
> URL: https://issues.apache.org/jira/browse/KAFKA-3809
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: James Cheng
> Attachments: configuration.html, topic_config.html
>
>
> The documentation for topic-level configuration is not auto-generated from 
> the code. configuration.html still contains hand-maintained documentation.
> I noticed this because I wanted to set message.timestamp.type on a topic, and 
> didn't see that it was supported, but grepped through the code and it looked 
> like it was.
> The code to auto-generate the docs is quite close, but needs some additional 
> work. In particular, topic-level configuration is different from all the 
> other ConfigDefs in that topic-level configuration docs list the broker-level 
> config that they inherit from. We would need to have a way to show what 
> broker-level config applies to each topic-level config.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic

2016-06-10 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323930#comment-15323930
 ] 

Todd Palino commented on KAFKA-3797:


Obviously we can't do something like that with a running consumer. But we do it 
all the time with inactive consumers.

I haven't looked at the most recent changes to the offset API yet, so I'm not 
sure whether or not there are restrictions in place that would allow a client 
that is not part of the group to commit offsets for the group.

> Improve security of __consumer_offsets topic
> 
>
> Key: KAFKA-3797
> URL: https://issues.apache.org/jira/browse/KAFKA-3797
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> By default, we allow clients to override committed offsets and group metadata 
> using the Produce API as long as they have Write access to the 
> __consumer_offsets topic. From one perspective, this is fine: administrators 
> can restrict access to this topic to trusted users. From another, it seems 
> less than ideal for Write permission on that topic to subsume Group-level 
> permissions for the full cluster. With this access, a user can cause all 
> kinds of mischief including making the group "lose" data by setting offsets 
> ahead of the actual position. This is probably not obvious to administrators 
> who grant access to topics using a wildcard and it increases the risk from 
> incorrectly applying topic patterns (if we ever add support for them). It 
> seems reasonable to consider safer default behavior:
> 1. A simple option to fix this would be to prevent wildcard topic rules from 
> applying to internal topics. To write to an internal topic, you need a 
> separate rule which explicitly grants authorization to that topic.
> 2. A more extreme and perhaps safer option might be to prevent all writes to 
> this topic (and potentially other internal topics) through the Produce API. 
> Do we have any use cases which actually require writing to 
> __consumer_offsets? The only potential case that comes to mind is replication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)