[jira] [Updated] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2023-11-22 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-7631:

Fix Version/s: 2.7.0

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> ---
>
> Key: KAFKA-7631
> URL: https://issues.apache.org/jira/browse/KAFKA-7631
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.0.0, 2.5.0
>Reporter: Andras Beni
>Priority: Minor
> Fix For: 2.7.0
>
> Attachments: KAFKA-7631.patch
>
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2023-11-22 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson resolved KAFKA-7631.
-
Resolution: Fixed

Marking as resolved since I believe it is.

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> ---
>
> Key: KAFKA-7631
> URL: https://issues.apache.org/jira/browse/KAFKA-7631
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.0.0, 2.5.0
>Reporter: Andras Beni
>Priority: Minor
> Fix For: 2.7.0
>
> Attachments: KAFKA-7631.patch
>
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2023-11-22 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788786#comment-17788786
 ] 

Andrew Olson commented on KAFKA-7631:
-

This looks like a duplicate of KAFKA-10556 (fixed in 2.7.0).

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> ---
>
> Key: KAFKA-7631
> URL: https://issues.apache.org/jira/browse/KAFKA-7631
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.0.0, 2.5.0
>Reporter: Andras Beni
>Priority: Minor
> Attachments: KAFKA-7631.patch
>
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13636) Committed offsets could be deleted during a rebalance if a group did not commit for a while

2022-05-02 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-13636:
-
Description: 
The group coordinator might delete invalid offsets during a group rebalance. 
During a rebalance, the coordinator is relying on the last commit timestamp 
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestamp (currentStateTimestamp{_}) to detect expired offsets.

 

This is relatively easy to reproduce by playing with 
group.initial.rebalance.delay.ms, offset.retention.minutes and 
offset.check.retention.interval, I uploaded an example on: 
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .

This script does:
 * Start a broker with: offset.retention.minute=2, 
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
  group.initial.rebalance.delay=2
 * Produced 10 messages
 * Create a consumer group to consume 10 messages, and disable auto.commit to 
only commit a few times
 * Wait 3 minutes, then the Consumer get a {{kill -9}}
 * Restart the consumer after a few seconds
 * The consumer restart from {{auto.offset.reset}} , the offset got removed

 

The cause is due to the GroupMetadata.scala:
 * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
 * When the new member joins, we add the new member right away in the group ; 
BUT the {{subscribedTopics}} is only updated once the migration is over (in the 
initNewGeneration) (which could take a while due to the 
{{{}group.initial.rebalance.delay{}}})
 * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
true as {{Set.empty != None}} (the underlying condition)
 * Thus we enter 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
 with an empty {{subscribedTopics}} list and we are relying on the 
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}

 

This seem to be a regression generated by KIP-496 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges
 (KAFKA-8338, KAFKA-8370)

  was:
The group coordinator might delete invalid offsets during a group rebalance. 
During a rebalance, the coordinator is relying on the last commit timestamp 
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestampt (currentStateTimestamp{_}) to detect expired offsets.

 

This is relatively easy to reproduce by playing with 
group.initial.rebalance.delay.ms, offset.retention.minutes and 
offset.check.retention.interval, I uploaded an example on: 
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .

This script does:
 * Start a broker with: offset.retention.minute=2, 
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
  group.initial.rebalance.delay=2
 * Produced 10 messages
 * Create a consumer group to consume 10 messages, and disable auto.commit to 
only commit a few times
 * Wait 3 minutes, then the Consumer get a {{kill -9}}
 * Restart the consumer after a few seconds
 * The consumer restart from {{auto.offset.reset}} , the offset got removed

 

The cause is due to the GroupMetadata.scala:
 * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
 * When the new member joins, we add the new member right away in the group ; 
BUT the {{subscribedTopics}} is only updated once the migration is over (in the 
initNewGeneration) (which could take a while due to the 
{{{}group.initial.rebalance.delay{}}})
 * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
true as {{Set.empty != None}} (the underlying condition)
 * Thus we enter 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
 with an empty {{subscribedTopics}} list and we are relying on the 
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}

 

This seem to be a regression generated by KIP-496 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges


> Committed offsets could be deleted during a rebalance if a group did not 
> commit for a while
> ---
>
> Key: KAFKA-13636
> URL: https://issues.apache.org/jira/browse/KAFKA-13636
> Project: Kafka
>  

[jira] [Commented] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2020-04-01 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073060#comment-17073060
 ] 

Andrew Olson commented on KAFKA-9233:
-

Changing version from 2.6.0 to 2.5.0 (committed 
[here|https://github.com/apache/kafka/commit/4b2268bd296e348f5a1cbe02cfc763167ea304e2]).

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Minor
> Fix For: 2.5.0
>
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2020-04-01 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-9233:

Fix Version/s: (was: 2.6.0)
   2.5.0

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Minor
> Fix For: 2.5.0
>
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2020-03-25 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066833#comment-17066833
 ] 

Andrew Olson commented on KAFKA-8950:
-

We're currently testing 2.3.1 now to see if it resolves this issue.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-2435) More optimally balanced partition assignment strategy

2020-02-17 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson closed KAFKA-2435.
---

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2020-02-17 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson closed KAFKA-3297.
---

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-12-31 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006247#comment-17006247
 ] 

Andrew Olson commented on KAFKA-9233:
-

Adding 2.5.0 fix version optimistically

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Minor
> Fix For: 2.5.0
>
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-12-31 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-9233:

Fix Version/s: 2.5.0

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Minor
> Fix For: 2.5.0
>
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-12-16 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997540#comment-16997540
 ] 

Andrew Olson commented on KAFKA-9233:
-

[~junrao] or [~hachikuji] Can you review this?

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Minor
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-12-04 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987909#comment-16987909
 ] 

Andrew Olson commented on KAFKA-9233:
-

[~hachikuji] Can you review this?

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Minor
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-11-27 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson reassigned KAFKA-9233:
---

Assignee: Andrew Olson

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Minor
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-11-26 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983002#comment-16983002
 ] 

Andrew Olson commented on KAFKA-9233:
-

Set priority to minor since it is easily worked around by using a Set instead 
of a List or otherwise being smarter about how the collection of TopicPartition 
values is gathered.

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Priority: Minor
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-11-26 Thread Andrew Olson (Jira)
Andrew Olson created KAFKA-9233:
---

 Summary: Kafka consumer throws undocumented IllegalStateException
 Key: KAFKA-9233
 URL: https://issues.apache.org/jira/browse/KAFKA-9233
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.3.0
Reporter: Andrew Olson


If the provided collection of TopicPartition instances contains any duplicates, 
an IllegalStateException not documented in the javadoc is thrown by internal 
Java stream code when calling KafkaConsumer#beginningOffsets or 
KafkaConsumer#endOffsets.

The stack trace looks like this,

{noformat}
java.lang.IllegalStateException: Duplicate key -2
at 
java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1254)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
{noformat}

{noformat}
java.lang.IllegalStateException: Duplicate key -1
at 
java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1254)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
{noformat}

Looking at the code, it appears this may likely have been introduced by 
KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
TopicPartition values silently ignored. Either we should document this 
exception possibility (probably wrapping it with a Kafka exception class) 
indicating invalid client API usage, or restore the previous behavior where the 
duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-25 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959954#comment-16959954
 ] 

Andrew Olson commented on KAFKA-8950:
-

Ok, sounds good. Thanks.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-25 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959891#comment-16959891
 ] 

Andrew Olson commented on KAFKA-8950:
-

Thank you for the detailed and helpful response, [~thomaslee]. The symptom we 
see is that a random partition occasionally (maybe once every month or two) 
gets "stuck" and accumulates lag indefinitely while all other partitions 
continue to be consumed normally, with the only solution being to restart the 
consumer. Our consumer groups are relatively large (about 20 to 80 members) 
with each member reading from about 10 to 25 partitions usually, some high 
volume (up to 5k messages per second) and some low. We don't auto-commit 
offsets.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-10-25 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959782#comment-16959782
 ] 

Andrew Olson commented on KAFKA-8950:
-

Was this confirmed to be first introduced in 2.3.0? If so do you know the 
introductory JIRA issue?

We may have seen similar in previous Kafka versions (0.10.2.1, 2.2.1), so 
curious if the bug is truly new or might have been around for a while.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9038) Allow creating partitions while partition reassignment is in progress

2019-10-14 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951367#comment-16951367
 ] 

Andrew Olson commented on KAFKA-9038:
-

Does it possibly matter if there's a consumer group subscribed to both topics? 
In that case, would we want it to remain blocked?

> Allow creating partitions while partition reassignment is in progress
> -
>
> Key: KAFKA-9038
> URL: https://issues.apache.org/jira/browse/KAFKA-9038
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Bob Barrett
>Priority: Major
>
> If a user attempts to create partitions for a topic while a partition 
> reassignment is in progress, we block the request even if the topic for which 
> partitions are being created is not involved in the reassignment. This is an 
> unnecessarily strict requirement; we should allow partition creation for 
> topics that are not involved in reassignments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8345) Create an Administrative API for Replica Reassignment

2019-07-17 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887206#comment-16887206
 ] 

Andrew Olson commented on KAFKA-8345:
-

[~cmccabe] [~enether] Can you verify that this could be use to just change the 
replica order so that a different broker becomes the preferred leader?

We have an admin script (shown below) that demotes a select broker from being 
the leader for any partitions. This is a use case that could possibly use this 
new API, if it's supported.

{noformat}
# usage:
# 1. On a Kafka broker node find all partitions with a broker id as first 
replica making it the preferred leader
# export BROKER_ID=
# export KAFKA_ZOOKEEPER=$(awk -F= '/zookeeper.connect/{print $2}' 
/opt/kafka/config/server.properties)
# /opt/kafka/bin/kafka-topics.sh --zookeeper ${KAFKA_ZOOKEEPER} --describe | 
grep "Replicas: ${BROKER_ID}," | awk '{print $2,$4,$8}' > 
kafka_topics_output.txt
# 2. Download and run this script to move first replica to end of replica list 
making it a follower by default
# ruby demote_kafka_broker.rb > reorder_replicas.json
# 3. Execute the replica order reassignment
# /opt/kafka/bin/kafka-reassign-partitions.sh --execute --zookeeper 
${KAFKA_ZOOKEEPER} --manual-assignment-json-file reorder_replicas.json
# 4. Verify the change was executed as expected
# /opt/kafka/bin/kafka-topics.sh --zookeeper ${KAFKA_ZOOKEEPER} --describe

require 'json'

topics = []
File.open("kafka_topics_output.txt", "r") do |f|
  f.each_line do |line|
parts = line.split(' ')
t = {}
t['topic'] = parts[0]
t['partition'] = parts[1].to_i
t['replicas'] = parts[2].split(',').map {|r| r.to_i }
t['replicas'] << t['replicas'][0]
t['replicas'].delete_at(0)
topics << t
  end
end

p = {}
p['partitions'] = topics
p['version'] = 1

puts JSON.pretty_generate(p)
{noformat}

> Create an Administrative API for Replica Reassignment
> -
>
> Key: KAFKA-8345
> URL: https://issues.apache.org/jira/browse/KAFKA-8345
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Create an Administrative API for Replica Reassignment, as discussed in KIP-455



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-7862) Modify JoinGroup logic to incorporate group.instance.id change

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-7862:

Component/s: consumer

> Modify JoinGroup logic to incorporate group.instance.id change
> --
>
> Key: KAFKA-7862
> URL: https://issues.apache.org/jira/browse/KAFKA-7862
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.3.0
>
>
> The step one for KIP-345 join group logic change to corporate with static 
> membership.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8484) ProducerId reset can cause IllegalStateException

2019-07-11 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883132#comment-16883132
 ] 

Andrew Olson commented on KAFKA-8484:
-

https://github.com/apache/kafka/pull/6883

> ProducerId reset can cause IllegalStateException
> 
>
> Key: KAFKA-8484
> URL: https://issues.apache.org/jira/browse/KAFKA-8484
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> If the producerId is reset while inflight requests are pending, we can get 
> the follow uncaught error.
> {code}
> [2019-06-03 08:20:45,320] ERROR [Producer clientId=producer-1] Uncaught error 
> in request completion: (org.apache.kafka.clients.NetworkClient)   
>   
>   
> java.lang.IllegalStateException: Sequence number for partition test_topic-13 
> is going to become negative : -965
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.adjustSequencesDueToFailedBatch(TransactionManager.java:561)
>   
> 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:744)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:717)
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:667)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:574)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75)
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:818)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The impact of this is that a failed batch will not be completed until the 
> delivery timeout is exceeded. We are missing validation when we receive a 
> produce response that the producerId and epoch still match.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8484) ProducerId reset can cause IllegalStateException

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8484:

Component/s: producer 

> ProducerId reset can cause IllegalStateException
> 
>
> Key: KAFKA-8484
> URL: https://issues.apache.org/jira/browse/KAFKA-8484
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> If the producerId is reset while inflight requests are pending, we can get 
> the follow uncaught error.
> {code}
> [2019-06-03 08:20:45,320] ERROR [Producer clientId=producer-1] Uncaught error 
> in request completion: (org.apache.kafka.clients.NetworkClient)   
>   
>   
> java.lang.IllegalStateException: Sequence number for partition test_topic-13 
> is going to become negative : -965
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.adjustSequencesDueToFailedBatch(TransactionManager.java:561)
>   
> 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:744)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:717)
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:667)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:574)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75)
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:818)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The impact of this is that a failed batch will not be completed until the 
> delivery timeout is exceeded. We are missing validation when we receive a 
> produce response that the producerId and epoch still match.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8463) Fix redundant reassignment of tasks when leader worker leaves

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8463:

Component/s: KafkaConnect

> Fix redundant reassignment of tasks when leader worker leaves
> -
>
> Key: KAFKA-8463
> URL: https://issues.apache.org/jira/browse/KAFKA-8463
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 2.3.0
>
>
> There's a bug in the computation of new assignment for connectors and tasks 
> in \{{IncrementalCooperativeAssignor}} that may lead to duplicate assignment. 
> The existing assignments should always be considered because if the leader 
> worker bounces there's no history of previous assignments. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8483) Possible reordering of messages by producer after UNKNOWN_PRODUCER_ID error

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8483:

Component/s: producer 

> Possible reordering of messages by producer after UNKNOWN_PRODUCER_ID error
> ---
>
> Key: KAFKA-8483
> URL: https://issues.apache.org/jira/browse/KAFKA-8483
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> The idempotent producer attempts to detect spurious UNKNOWN_PRODUCER_ID 
> errors and handle them by reassigning sequence numbers to the inflight 
> batches. The inflight batches are tracked in a PriorityQueue. The problem is 
> that the reassignment of sequence numbers depends on the iteration order of 
> PriorityQueue, which does not guarantee any ordering. So this can result in 
> sequence numbers being assigned in the wrong order.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8344:

Component/s: system tests

> Fix vagrant-up.sh to work with AWS properly
> ---
>
> Key: KAFKA-8344
> URL: https://issues.apache.org/jira/browse/KAFKA-8344
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Major
> Fix For: 2.3.0
>
>
> I tried to run {{vagrant/vagrant-up.sh --aws}} with the following 
> Vagrantfile.local.
> {code}
> enable_dns = true
> enable_hostmanager = false
> # EC2
> ec2_access_key = ""
> ec2_secret_key = ""
> ec2_keypair_name = "keypair"
> ec2_keypair_file = "/path/to/keypair/file"
> ec2_region = "ap-northeast-1"
> ec2_ami = "ami-0905ffddadbfd01b7"
> ec2_security_groups = "sg-"
> ec2_subnet_id = "subnet-"
> {code}
> EC2 instances were successfully created, but it failed with the following 
> error after that.
> {code}
> $ vagrant/vagrant-up.sh --aws
> (snip)
> An active machine was found with a different provider. Vagrant
> currently allows each machine to be brought up with only a single
> provider at a time. A future version will remove this limitation.
> Until then, please destroy the existing machine to up with a new
> provider.
> Machine name: zk1
> Active provider: aws
> Requested provider: virtualbox
> {code}
> It seems that the {{vagrant hostmanager}} command also requires 
> {{--provider=aws}} option, in addition to {{vagrant up}}.
> With that option, it succeeded as follows:
> {code}
> $ git diff
> diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh
> index 6a4ef9564..9210a5357 100755
> --- a/vagrant/vagrant-up.sh
> +++ b/vagrant/vagrant-up.sh
> @@ -220,7 +220,7 @@ function bring_up_aws {
>  # We still have to bring up zookeeper/broker nodes serially
>  echo "Bringing up zookeeper/broker machines serially"
>  vagrant up --provider=aws --no-parallel --no-provision 
> $zk_broker_machines $debug
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  vagrant provision
>  fi
> @@ -231,11 +231,11 @@ function bring_up_aws {
>  local vagrant_rsync_temp_dir=$(mktemp -d);
>  TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up 
> $debug --provider=aws" "$worker_machines" "$max_parallel"
>  rm -rf $vagrant_rsync_temp_dir
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  fi
>  else
>  vagrant up --provider=aws --no-parallel --no-provision $debug
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  vagrant provision
>  fi
> $ vagrant/vagrant-up.sh --aws
> (snip)
> ==> broker3: Running provisioner: shell...
> broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh
> broker3: Killing server
> broker3: No kafka server to stop
> broker3: Starting server
> $ vagrant status
> Current machine states:
> zk1   running (aws)
> broker1   running (aws)
> broker2   running (aws)
> broker3   running (aws)
> This environment represents multiple VMs. The VMs are all listed
> above with their current state. For more information about a specific
> VM, run `vagrant status NAME`.
> $ vagrant ssh broker1
> (snip)
> ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh 
> --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --create 
> --partitions 1 --replication-factor 3 --topic sandbox
> (snip)
> ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh 
> --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list
> (snip)
> sandbox
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8324) User constructed RocksObjects leak memory

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8324:

Component/s: streams

> User constructed RocksObjects leak memory
> -
>
> Key: KAFKA-8324
> URL: https://issues.apache.org/jira/browse/KAFKA-8324
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
> Fix For: 2.3.0
>
>
> Some of the RocksDB options a user can set when extending RocksDBConfigSetter 
> take Rocks objects as parameters. Many of these – including potentially large 
> objects like Cache and Filter – inherit from AbstractNativeReference and must 
> be closed explicitly in order to free the memory of the backing C++ object. 
> However the user has no way of closing any objects they have created in 
> RocksDBConfigSetter, and we do not ever close them for them. 
> KIP-453: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter]
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8275) NetworkClient leastLoadedNode selection should consider throttled nodes

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8275:

Component/s: clients

> NetworkClient leastLoadedNode selection should consider throttled nodes
> ---
>
> Key: KAFKA-8275
> URL: https://issues.apache.org/jira/browse/KAFKA-8275
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> The leastLoadedNode() function is intended to find any available node. It is 
> smart in the sense that it considers the number of inflight requests and 
> reconnect backoff, but it has not been updated to take into account client 
> throttling. If we have an available node which is not throttled, we should 
> use it.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8275) NetworkClient leastLoadedNode selection should consider throttled nodes

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8275:

Component/s: network

> NetworkClient leastLoadedNode selection should consider throttled nodes
> ---
>
> Key: KAFKA-8275
> URL: https://issues.apache.org/jira/browse/KAFKA-8275
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, network
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> The leastLoadedNode() function is intended to find any available node. It is 
> smart in the sense that it considers the number of inflight requests and 
> reconnect backoff, but it has not been updated to take into account client 
> throttling. If we have an available node which is not throttled, we should 
> use it.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-7974) KafkaAdminClient loses worker thread/enters zombie state when initial DNS lookup fails

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-7974:

Component/s: admin

> KafkaAdminClient loses worker thread/enters zombie state when initial DNS 
> lookup fails
> --
>
> Key: KAFKA-7974
> URL: https://issues.apache.org/jira/browse/KAFKA-7974
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Nicholas Parker
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Version: kafka-clients-2.1.0
> I have some code that creates creates a KafkaAdminClient instance and then 
> invokes listTopics(). I was seeing the following stacktrace in the logs, 
> after which the KafkaAdminClient instance became unresponsive:
> {code:java}
> ERROR [kafka-admin-client-thread | adminclient-1] 2019-02-18 01:00:45,597 
> KafkaThread.java:51 - Uncaught exception in thread 'kafka-admin-client-thread 
> | adminclient-1':
> java.lang.IllegalStateException: No entry found for connection 0
>     at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
>     at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
>     at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921)
>     at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
>     at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:898)
>     at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1113)
>     at java.lang.Thread.run(Thread.java:748){code}
> From looking at the code I was able to trace down a possible cause:
>  * NetworkClient.ready() invokes this.initiateConnect() as seen in the above 
> stacktrace
>  * NetworkClient.initiateConnect() invokes 
> ClusterConnectionStates.connecting(), which internally invokes 
> ClientUtils.resolve() to to resolve the host when creating an entry for the 
> connection.
>  * If this host lookup fails, a UnknownHostException can be thrown back to 
> NetworkClient.initiateConnect() and the connection entry is not created in 
> ClusterConnectionStates. This exception doesn't get logged so this is a guess 
> on my part.
>  * NetworkClient.initiateConnect() catches the exception and attempts to call 
> ClusterConnectionStates.disconnected(), which throws an IllegalStateException 
> because no entry had yet been created due to the lookup failure.
>  * This IllegalStateException ends up killing the worker thread and 
> KafkaAdminClient gets stuck, never returning from listTopics().



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-7912) In-memory key-value store does not support concurrent access

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-7912:

Component/s: streams

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-7831) Consumer SubscriptionState missing synchronization

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-7831:

Component/s: consumer

> Consumer SubscriptionState missing synchronization
> --
>
> Key: KAFKA-7831
> URL: https://issues.apache.org/jira/browse/KAFKA-7831
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> ConsumerCoordinator installs a Metadata.Listener in order to update pattern 
> subscriptions after metadata changes. The listener is invoked from 
> NetworkClient.poll, which could happen in the heartbeat thread. Currently, 
> however, there is no synchronization in SubscriptionState to make this safe. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-3143) inconsistent state in ZK when all replicas are dead

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-3143:

Component/s: controller

> inconsistent state in ZK when all replicas are dead
> ---
>
> Key: KAFKA-3143
> URL: https://issues.apache.org/jira/browse/KAFKA-3143
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: Jun Rao
>Assignee: Ismael Juma
>Priority: Major
>  Labels: reliability
> Fix For: 2.3.0
>
>
> This issue can be recreated in the following steps.
> 1. Start 3 brokers, 1, 2 and 3.
> 2. Create a topic with a single partition and 2 replicas, say on broker 1 and 
> 2.
> If we stop both replicas 1 and 2, depending on where the controller is, the 
> leader and isr stored in ZK in the end are different.
> If the controller is on broker 3, what's stored in ZK will be -1 for leader 
> and an empty set for ISR.
> On the other hand, if the controller is on broker 2 and we stop broker 1 
> followed by broker 2, what's stored in ZK will be 2 for leader and 2 for ISR.
> The issue is that in the first case, the controller will call 
> ReplicaStateMachine to transition to OfflineReplica, which will change the 
> leader and isr. However, in the second case, the controller fails over, but 
> we don't transition ReplicaStateMachine to OfflineReplica during controller 
> initialization.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8294) Batch StopReplica requests with partition deletion and add test cases

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8294:

Component/s: controller

> Batch StopReplica requests with partition deletion and add test cases
> -
>
> Key: KAFKA-8294
> URL: https://issues.apache.org/jira/browse/KAFKA-8294
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> One of the tricky aspects we found in KAFKA-8237 is the batching of the 
> StopReplica requests. We should have test cases covering expected behavior so 
> that we do not introduce regressions and we should make the batching 
> consistent whether or not `deletePartitions` is set.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-7440) Use leader epoch in consumer fetch requests

2019-07-11 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-7440:

Component/s: consumer

> Use leader epoch in consumer fetch requests
> ---
>
> Key: KAFKA-7440
> URL: https://issues.apache.org/jira/browse/KAFKA-7440
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: David Arthur
>Priority: Major
>  Labels: kip
> Fix For: 2.3.0
>
>
> This patch adds support in the consumer to use the leader epoch obtained from 
> the metadata in fetch requests: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8601) Producer Improvement: Sticky Partitioner

2019-07-01 Thread Andrew Olson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-8601:

Description: 
Currently the default partitioner uses a round-robin strategy to partition 
non-keyed values. The idea is to implement a "sticky partitioner" that chooses 
a partition for a topic and sends all records to that partition until the batch 
is sent. Then a new partition is chosen. This new partitioner will increase 
batching and decrease latency. 

KIP link: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner

  was:Currently the default partitioner uses a round-robin strategy to 
partition non-keyed values. The idea is to implement a "sticky partitioner" 
that chooses a partition for a topic and sends all records to that partition 
until the batch is sent. Then a new partition is chosen. This new partitioner 
will increase batching and decrease latency. 


> Producer Improvement: Sticky Partitioner
> 
>
> Key: KAFKA-8601
> URL: https://issues.apache.org/jira/browse/KAFKA-8601
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Currently the default partitioner uses a round-robin strategy to partition 
> non-keyed values. The idea is to implement a "sticky partitioner" that 
> chooses a partition for a topic and sends all records to that partition until 
> the batch is sent. Then a new partition is chosen. This new partitioner will 
> increase batching and decrease latency. 
> KIP link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8601) Producer Improvement: Sticky Partitioner

2019-07-01 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876259#comment-16876259
 ] 

Andrew Olson commented on KAFKA-8601:
-

I think this is a worthwhile enhancement. Netflix did something similar,
https://medium.com/netflix-techblog/kafka-inside-keystone-pipeline-dd5aeabaf6bb 
(see "Producer sticky partitioner" section)

Our open source common-kafka project provides a partitioner implementation that 
follows this approach.
https://github.com/cerner/common-kafka/blob/2.0/common-kafka/src/main/java/com/cerner/common/kafka/producer/partitioners/FairPartitioner.java

> Producer Improvement: Sticky Partitioner
> 
>
> Key: KAFKA-8601
> URL: https://issues.apache.org/jira/browse/KAFKA-8601
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Currently the default partitioner uses a round-robin strategy to partition 
> non-keyed values. The idea is to implement a "sticky partitioner" that 
> chooses a partition for a topic and sends all records to that partition until 
> the batch is sent. Then a new partition is chosen. This new partitioner will 
> increase batching and decrease latency. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7089) Extend `kafka-consumer-groups.sh` to show "beginning offsets"

2019-06-12 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862273#comment-16862273
 ] 

Andrew Olson commented on KAFKA-7089:
-

[~hachikuji] Would it be a separate KIP to add the retrieval of broker start 
and end offsets for topic partition(s) into the AdminClient? For consumer group 
lag monitoring (if we want to use only Java code and not Scala like 
ConsumerGroupCommand still does) having to create a Consumer just for usage of 
its endOffsets method is not ideal... since it has irrelevant configs like 
serializer/deserializer/consumer group name.

> Extend `kafka-consumer-groups.sh` to show "beginning offsets"
> -
>
> Key: KAFKA-7089
> URL: https://issues.apache.org/jira/browse/KAFKA-7089
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Matthias J. Sax
>Assignee: Vahid Hashemian
>Priority: Minor
>
> Currently, `kafka-consumer-groups.sh` only shows "current offset", "end 
> offset" and "lag". It would be helpful to extend the tool to also show 
> "beginning/earliest offset".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-06-07 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16858616#comment-16858616
 ] 

Andrew Olson commented on KAFKA-8421:
-

It would be helpful to design and build a stress test harness for exhaustive 
verification of this, since it seems like quite complex and critical 
functionality. We could embark on that effort now, and use it to more fully 
illustrate the weaknesses of the current behavior along with making sure we 
have adequate logging in place for insight into what's going on deep in the 
consumer internals.

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records they may not call the next poll() in time 
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer 
> dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-05-30 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852279#comment-16852279
 ] 

Andrew Olson commented on KAFKA-8421:
-

Agree this would be a potentially quite beneficial enhancement.

> Allow consumer.poll() to return data in the middle of rebalance
> ---
>
> Key: KAFKA-8421
> URL: https://issues.apache.org/jira/browse/KAFKA-8421
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> With KIP-429 in place, today when a consumer is about to send join-group 
> request its owned partitions may not be empty, meaning that some of its 
> fetched data can still be returned. Nevertheless, today the logic is strict:
> {code}
> if (!updateAssignmentMetadataIfNeeded(timer)) {
> return ConsumerRecords.empty();
> }
> {code}
> I.e. if the consumer enters a rebalance it always returns no data. 
> As an optimization, we can consider letting consumers to still return 
> messages that still belong to its owned partitions even when it is within a 
> rebalance, because we know it is safe that no one else would claim those 
> partitions in this rebalance yet, and we can still commit offsets if, after 
> this rebalance, the partitions need to be revoked then.
> One thing we need to take care though is the rebalance timeout, i.e. when 
> consumer's processing those records they may not call the next poll() in time 
> (think: Kafka Streams num.iterations mechanism), which may leads to consumer 
> dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8052) Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request

2019-05-17 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842386#comment-16842386
 ] 

Andrew Olson commented on KAFKA-8052:
-

[~rsivaram] There are some comments on your pull request, as well as a merge 
conflict that needs resolved.

> Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request 
> 
>
> Key: KAFKA-8052
> URL: https://issues.apache.org/jira/browse/KAFKA-8052
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Bartek Jakub
>Assignee: Rajini Sivaram
>Priority: Major
>
> I noticed in my logs some weird behavior. I see in logs intermittent log: 
> {noformat}
> 2019-03-06 14:02:13.024 INFO 1 --- [container-1-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-4, 
> groupId=service-main] Node 2 was unable to process the fetch request with 
> (sessionId=1321134604, epoch=125730): INVALID_FETCH_SESSION_EPOCH.{noformat}
> which happens every ~1 hour. 
>  
> I was wondering if it's my Kafka provider fault so I decided to investigate 
> the problem and I tried to reproduce the issue on my local - with success. My 
> configuration is:
>  * Kafka Clients version - 2.0.1
>  * Kafka - 2.12_2.1.0
>  
> I enabled trace logs for 'org.apache.kafka.clients' and that's what I get:
> {noformat}
> 2019-03-05 21:04:16.161 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Built incremental fetch (sessionId=197970881, 
> epoch=525) for node 1001. Added (), altered (), removed () out of 
> (itunes-command-19, itunes-command-18, itunes-command-11, itunes-command-10, 
> itunes-command-13, itunes-command-12, itunes-command-15, itunes-command-14, 
> itunes-command-17, itunes-command-16)
> 2019-03-05 21:04:16.161 DEBUG 3052 --- [container-0-C-1] 
> o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending READ_UNCOMMITTED 
> IncrementalFetchRequest(toSend=(), toForget=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)) to broker localhost:9092 (id: 1001 rack: null)
> 2019-03-05 21:04:16.161 TRACE 3052 --- [container-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending FETCH 
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=197970881,epoch=525,topics=[],forgotten_topics_data=[]}
>  with correlation id 629 to node 1001
> 2019-03-05 21:04:16.664 TRACE 3052 --- [container-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, 
> groupId=service-main] Completed receive from node 1001 for FETCH with 
> correlation id 629, received 
> {throttle_time_ms=0,error_code=0,session_id=197970881,responses=[]}
> 2019-03-05 21:04:16.664 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Node 1001 sent an incremental fetch response for 
> session 197970881 with response=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)
> 2019-03-05 21:04:16.665 DEBUG 3052 --- [container-0-C-1] 
> o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, 
> groupId=service-main] Built incremental fetch (sessionId=197970881, 
> epoch=526) for node 1001. Added (), altered (), removed () out of 
> (itunes-command-19, itunes-command-18, itunes-command-11, itunes-command-10, 
> itunes-command-13, itunes-command-12, itunes-command-15, itunes-command-14, 
> itunes-command-17, itunes-command-16)
> 2019-03-05 21:04:16.665 DEBUG 3052 --- [container-0-C-1] 
> o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, 
> groupId=service-main] Sending READ_UNCOMMITTED 
> IncrementalFetchRequest(toSend=(), toForget=(), implied=(itunes-command-19, 
> itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, 
> itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, 
> itunes-command-16)) to broker localhost:9092 (id: 1001 rack: null)
> 2019-03-05 21:04:16.665 TRACE 3052 --- [container-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, 
> groupId=service-main - F630] Sending FETCH 
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=197970881,epoch=526,topics=[],forgotten_topics_data=[]}
>  with correlation id 630 to node 1001
> 2019-03-05 

[jira] [Commented] (KAFKA-7866) Duplicate offsets after transaction index append failure

2019-04-25 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826431#comment-16826431
 ] 

Andrew Olson commented on KAFKA-7866:
-

[~hachikuji] Do you have an estimated release data for 2.2.1?

> Duplicate offsets after transaction index append failure
> 
>
> Key: KAFKA-7866
> URL: https://issues.apache.org/jira/browse/KAFKA-7866
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.1
>
>
> We have encountered a situation in which an ABORT marker was written 
> successfully to the log, but failed to be written to the transaction index. 
> This prevented the log end offset from being incremented. This resulted in 
> duplicate offsets when the next append was attempted. The broker was using 
> JBOD and we would normally expect IOExceptions to cause the log directory to 
> be failed. That did not seem to happen here and the duplicates continued for 
> several hours.
> Unfortunately, we are not sure what the cause of the failure was. 
> Significantly, the first duplicate was also the first ABORT marker in the 
> log. Unlike the offset and timestamp index, the transaction index is created 
> on demand after the first aborted transction. It is likely that the attempt 
> to create and open the transaction index failed. There is some suggestion 
> that the process may have bumped into the open file limit. Whatever the 
> problem was, it also prevented log collection, so we cannot confirm our 
> guesses. 
> Without knowing the underlying cause, we can still consider some potential 
> improvements:
> 1. We probably should be catching non-IO exceptions in the append process. If 
> the append to one of the indexes fails, we potentially truncate the log or 
> re-throw it as an IOException to ensure that the log directory is no longer 
> used.
> 2. Even without the unexpected exception, there is a small window during 
> which even an IOException could lead to duplicate offsets. Marking a log 
> directory offline is an asynchronous operation and there is no guarantee that 
> another append cannot happen first. Given this, we probably need to detect 
> and truncate duplicates during the log recovery process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8051) remove KafkaMbean when network close

2019-04-25 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826387#comment-16826387
 ] 

Andrew Olson commented on KAFKA-8051:
-

[~monty] It looks like you created 5 duplicate issues, KAFKA-8047 through 
KAFKA-8051.



> remove KafkaMbean when network close
> 
>
> Key: KAFKA-8051
> URL: https://issues.apache.org/jira/browse/KAFKA-8051
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: limeng
>Priority: Critical
> Fix For: 2.2.1
>
>
> the  broker server will be oom when 
>  * a large number of clients frequently close and reconnect
>  * the clientId changes every time when reconnect,that gives rise to too much 
> kafkaMbean in broker
> the reason is that broker forget to remove kafkaMbean when detect connection 
> closes.
> h2.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7215) Improve LogCleaner behavior on error

2018-08-24 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592162#comment-16592162
 ] 

Andrew Olson commented on KAFKA-7215:
-

Agree this would be helpful. We have run into a few instances of the cleaner 
thread dying and logs growing indefinitely until broker restart. Would the 
offending topic-partition be immediately blacklisted? If so that would still 
allow the same problem to occur especially for a high volume topic such as 
__consumer_offsets, might give it some reasonable number of retry attempts.

> Improve LogCleaner behavior on error
> 
>
> Key: KAFKA-7215
> URL: https://issues.apache.org/jira/browse/KAFKA-7215
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> For more detailed information see 
> [KIP-346|https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2018-03-26 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413904#comment-16413904
 ] 

Andrew Olson commented on KAFKA-5413:
-

{quote}Is this the same issue as was reported here?{quote}

It sounds like a related issue, that may be more rare than what the correction 
for this one addressed, or a variant that eluded the bug fix here. The stack 
trace looks the same.

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping /kafka-logs/__consumer_offsets-12/002147343575.log
> Starting offset: 2147343575
> offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo
> adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34
> {noformat}
> My guess is that since 2147539884 is larger than MAXINT, we are hitting this 
> exception. Was there a specific reason, this check was added in 0.10.2?
> E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of 
> "key 1" following, wouldn't we run into this situation whenever the log 
> cleaner runs?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4840) There are still cases where producer buffer pool will not remove waiters.

2018-03-23 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-4840:

Summary: There are still cases where producer buffer pool will not remove 
waiters.  (was: There are are still cases where producer buffer pool will not 
remove waiters.)

> There are still cases where producer buffer pool will not remove waiters.
> -
>
> Key: KAFKA-4840
> URL: https://issues.apache.org/jira/browse/KAFKA-4840
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
>Reporter: Sean McCauliff
>Assignee: Sean McCauliff
>Priority: Major
> Fix For: 0.11.0.0
>
>
> There are several problems dealing with errors in  BufferPool.allocate(int 
> size, long maxTimeToBlockMs):
> * The accumulated number of bytes are not put back into the available pool 
> when an exception happens and a thread is waiting for bytes to become 
> available.  This will cause the capacity of the buffer pool to decrease over 
> time any time a timeout is hit within this method.
> * If a Throwable other than InterruptedException is thrown out of await() for 
> some reason or if there is an exception thrown in the corresponding finally 
> block around the await(), for example if waitTime.record(.) throws an 
> exception, then the waiters are not removed from the waiters deque.
> * On timeout or other exception waiters could be signaled, but are not.  If 
> no other buffers are freed then the next waiting thread will also timeout and 
> so on.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2435) More optimally balanced partition assignment strategy

2018-02-08 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357214#comment-16357214
 ] 

Andrew Olson commented on KAFKA-2435:
-

[~jeffwidman] yes, that sounds quite reasonable to me.

> More optimally balanced partition assignment strategy
> -
>
> Key: KAFKA-2435
> URL: https://issues.apache.org/jira/browse/KAFKA-2435
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-2435.patch
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the original high-level consumer. For the new consumer, 
> see KAFKA-3297.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2018-01-19 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332510#comment-16332510
 ] 

Andrew Olson commented on KAFKA-3297:
-

We have released [1] this enhancement in an open source project [2].

[1] 
https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.cerner.common.kafka%22
[2] 
https://github.com/cerner/common-kafka/blob/master/common-kafka/README.md#fairpartitioner

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 1.1.0
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2018-01-11 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322857#comment-16322857
 ] 

Andrew Olson commented on KAFKA-5413:
-

Thanks for the insight, we'll keep a watchful eye on it.

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping /kafka-logs/__consumer_offsets-12/002147343575.log
> Starting offset: 2147343575
> offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo
> adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34
> {noformat}
> My guess is that since 2147539884 is larger than MAXINT, we are hitting this 
> exception. Was there a specific reason, this check was added in 0.10.2?
> E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of 
> "key 1" following, wouldn't we run into this situation whenever the log 
> cleaner runs?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2018-01-11 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322759#comment-16322759
 ] 

Andrew Olson edited comment on KAFKA-5413 at 1/11/18 7:06 PM:
--

We ran into this for a partition of the __consumer_offsets topic. The issue was 
discovered when we noticed that the open file count for a Kafka broker had been 
steadily growing for a couple months, and was about 2x higher than any other 
broker in the cluster. When we restarted this broker it seemed able to recover, 
deleting a large number of old log segments, with the open file count returning 
to a more normal value.


was (Author: noslowerdna):
We ran into this for a partition of the __consumer_offsets topic. The issue was 
discovered when we noticed that the open file count for a Kafka broker had been 
steadily growing for a couple months, and was about 2x higher than any other 
broker in the cluster. When we restarted this broker it seemed able to recover, 
deleting a large number of old segments for __consumer_offsets, with the open 
file count returning to a more normal value.

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping 

[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2018-01-11 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322759#comment-16322759
 ] 

Andrew Olson commented on KAFKA-5413:
-

We ran into this for a partition of the __consumer_offsets topic. The issue was 
discovered when we noticed that the open file count for a Kafka broker had been 
steadily growing for a couple months, and was about 2x higher than any other 
broker in the cluster. When we restarted this broker it seemed able to recover, 
deleting a large number of old segments for __consumer_offsets, with the open 
file count returning to a more normal value.

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping /kafka-logs/__consumer_offsets-12/002147343575.log
> Starting offset: 2147343575
> offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo
> adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34
> {noformat}
> My guess is that since 2147539884 is larger than MAXINT, we are hitting this 
> exception. Was there a specific reason, this check was added in 0.10.2?
> E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of 
> "key 1" following, wouldn't we 

[jira] [Updated] (KAFKA-6254) Introduce Incremental FetchRequests to Increase Partition Scalability

2018-01-04 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-6254:

Description: Introduce Incremental FetchRequests to Increase Partition 
Scalability.  See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
  (was: Introduce Incremental FetchRequests to Increase Partition Scalability.  
See https://cwiki.apache.org/confluence/pages/editpage.action?pageId=74687799)

> Introduce Incremental FetchRequests to Increase Partition Scalability
> -
>
> Key: KAFKA-6254
> URL: https://issues.apache.org/jira/browse/KAFKA-6254
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Introduce Incremental FetchRequests to Increase Partition Scalability.  See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6334) Minor documentation typo

2017-12-12 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-6334:

Fix Version/s: 0.11.0.0

> Minor documentation typo
> 
>
> Key: KAFKA-6334
> URL: https://issues.apache.org/jira/browse/KAFKA-6334
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Trivial
> Fix For: 0.11.0.0, 1.0.0
>
>
> At [1]:
> {quote}
> 0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers 
> and upward, so it is possible to upgrade the clients first before the brokers
> {quote}
> Specifically the "brokers 0.10.0 brokers" wording.
> [1] http://kafka.apache.org/documentation.html#upgrade_11_message_format



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6334) Minor documentation typo

2017-12-12 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-6334:

Affects Version/s: 0.11.0.0

> Minor documentation typo
> 
>
> Key: KAFKA-6334
> URL: https://issues.apache.org/jira/browse/KAFKA-6334
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Trivial
> Fix For: 0.11.0.0, 1.0.0
>
>
> At [1]:
> {quote}
> 0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers 
> and upward, so it is possible to upgrade the clients first before the brokers
> {quote}
> Specifically the "brokers 0.10.0 brokers" wording.
> [1] http://kafka.apache.org/documentation.html#upgrade_11_message_format



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6334) Minor documentation typo

2017-12-11 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson reassigned KAFKA-6334:
---

Assignee: Andrew Olson

> Minor documentation typo
> 
>
> Key: KAFKA-6334
> URL: https://issues.apache.org/jira/browse/KAFKA-6334
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 1.0.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Trivial
>
> At [1]:
> {quote}
> 0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers 
> and upward, so it is possible to upgrade the clients first before the brokers
> {quote}
> Specifically the "brokers 0.10.0 brokers" wording.
> [1] http://kafka.apache.org/documentation.html#upgrade_11_message_format



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6334) Minor documentation typo

2017-12-11 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286608#comment-16286608
 ] 

Andrew Olson commented on KAFKA-6334:
-

[~guozhang] pull request: https://github.com/apache/kafka-site/pull/111

> Minor documentation typo
> 
>
> Key: KAFKA-6334
> URL: https://issues.apache.org/jira/browse/KAFKA-6334
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 1.0.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Trivial
>
> At [1]:
> {quote}
> 0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers 
> and upward, so it is possible to upgrade the clients first before the brokers
> {quote}
> Specifically the "brokers 0.10.0 brokers" wording.
> [1] http://kafka.apache.org/documentation.html#upgrade_11_message_format



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6334) Minor documentation typo

2017-12-08 Thread Andrew Olson (JIRA)
Andrew Olson created KAFKA-6334:
---

 Summary: Minor documentation typo
 Key: KAFKA-6334
 URL: https://issues.apache.org/jira/browse/KAFKA-6334
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 1.0.0
Reporter: Andrew Olson
Priority: Trivial


At [1]:

{quote}
0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers 
and upward, so it is possible to upgrade the clients first before the brokers
{quote}

Specifically the "brokers 0.10.0 brokers" wording.

[1] http://kafka.apache.org/documentation.html#upgrade_11_message_format



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4932) Add UUID Serde

2017-09-26 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-4932:

Description: 
I propose adding serializers and deserializers for the java.util.UUID class.

I have many use cases where I want to set the key of a Kafka message to be a 
UUID. Currently, I need to turn UUIDs into strings or byte arrays and use their 
associated Serdes, but it would be more convenient to serialize and deserialize 
UUIDs directly.

I'd propose that the serializer and deserializer use the 36-byte string 
representation, calling UUID.toString and UUID.fromString, and then using the 
existing StringSerializer / StringDeserializer to finish the job. We would also 
wrap these in a Serde and modify the streams Serdes class to include this in 
the list of supported types.

Optionally, we could have the deserializer support a 16-byte representation and 
it would check the size of the input byte array to determine whether it's a 
binary or string representation of the UUID. It's not well defined whether the 
most significant bits or least significant go first, so this deserializer would 
have to support only one or the other.

Similary, if the deserializer supported a 16-byte representation, there could 
be two variants of the serializer, a UUIDStringSerializer and a 
UUIDBytesSerializer.

I would be willing to write this PR, but am looking for feedback about whether 
there are significant concerns here around ambiguity of what the byte 
representation of a UUID should be, or if there's desire to keep to list of 
built-in Serdes minimal such that a PR would be unlikely to be accepted.

KIP Link: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization

  was:
I propose adding serializers and deserializers for the java.util.UUID class.

I have many use cases where I want to set the key of a Kafka message to be a 
UUID. Currently, I need to turn UUIDs into strings or byte arrays and use their 
associated Serdes, but it would be more convenient to serialize and deserialize 
UUIDs directly.

I'd propose that the serializer and deserializer use the 36-byte string 
representation, calling UUID.toString and UUID.fromString. We would also wrap 
these in a Serde and modify the streams Serdes class to include this in the 
list of supported types.

Optionally, we could have the deserializer support a 16-byte representation and 
it would check the size of the input byte array to determine whether it's a 
binary or string representation of the UUID. It's not well defined whether the 
most significant bits or least significant go first, so this deserializer would 
have to support only one or the other.

Similary, if the deserializer supported a 16-byte representation, there could 
be two variants of the serializer, a UUIDStringSerializer and a 
UUIDBytesSerializer.

I would be willing to write this PR, but am looking for feedback about whether 
there are significant concerns here around ambiguity of what the byte 
representation of a UUID should be, or if there's desire to keep to list of 
built-in Serdes minimal such that a PR would be unlikely to be accepted.


> Add UUID Serde
> --
>
> Key: KAFKA-4932
> URL: https://issues.apache.org/jira/browse/KAFKA-4932
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Jakub Scholz
>Priority: Minor
>  Labels: needs-kip, newbie
>
> I propose adding serializers and deserializers for the java.util.UUID class.
> I have many use cases where I want to set the key of a Kafka message to be a 
> UUID. Currently, I need to turn UUIDs into strings or byte arrays and use 
> their associated Serdes, but it would be more convenient to serialize and 
> deserialize UUIDs directly.
> I'd propose that the serializer and deserializer use the 36-byte string 
> representation, calling UUID.toString and UUID.fromString, and then using the 
> existing StringSerializer / StringDeserializer to finish the job. We would 
> also wrap these in a Serde and modify the streams Serdes class to include 
> this in the list of supported types.
> Optionally, we could have the deserializer support a 16-byte representation 
> and it would check the size of the input byte array to determine whether it's 
> a binary or string representation of the UUID. It's not well defined whether 
> the most significant bits or least significant go first, so this deserializer 
> would have to support only one or the other.
> Similary, if the deserializer supported a 16-byte representation, there could 
> be two variants of the serializer, a UUIDStringSerializer and a 
> UUIDBytesSerializer.
> I would be willing to write this PR, but am looking for feedback about 
> whether there are significant 

[jira] [Updated] (KAFKA-5926) --force option is ignored by kafka-configs and kafka-topics tools

2017-09-20 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-5926:

Summary: --force option is ignored by kafka-configs and kafka-topics tools  
(was: --force option is ginored by kafka-configs and kafka-topics tools)

> --force option is ignored by kafka-configs and kafka-topics tools
> -
>
> Key: KAFKA-5926
> URL: https://issues.apache.org/jira/browse/KAFKA-5926
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>
> Both ConfigCommand and TopicCommand list a --force option in their help but 
> it is not used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5939) Add a dryrun option to release.py

2017-09-20 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-5939:

Issue Type: New Feature  (was: Bug)

> Add a dryrun option to release.py
> -
>
> Key: KAFKA-5939
> URL: https://issues.apache.org/jira/browse/KAFKA-5939
> Project: Kafka
>  Issue Type: New Feature
>  Components: tools
>Reporter: Damian Guy
>
> It would be great to add a `dryrun` feature to `release.py` so that it can be 
> used to test changes to the scripts etc. At the moment you need to make sure 
> all JIRAs are closed for the release, have no uncommited changes etc, which 
> is a bit of a hassle when you just want to test a change you've made to the 
> script. There may be other things that need to be skipped, too



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change

2017-07-03 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072663#comment-16072663
 ] 

Andrew Olson commented on KAFKA-1120:
-

[~wushujames] Can you retest with Kafka 0.11 to see if KAFKA-1211 resolves this 
problem?

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>  Labels: reliability
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (KAFKA-1379) Partition reassignment resets clock for time-based retention

2017-06-27 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-1379:

Comment: was deleted

(was: [~hachikuji] Jason, could you confirm if this bug has been fixed? 
According to http://kafka.apache.org/documentation.html#upgrade_10_1_breaking 
it appears so.)

> Partition reassignment resets clock for time-based retention
> 
>
> Key: KAFKA-1379
> URL: https://issues.apache.org/jira/browse/KAFKA-1379
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Joel Koshy
>
> Since retention is driven off mod-times reassigned partitions will result in
> data that has been on a leader to be retained for another full retention
> cycle. E.g., if retention is seven days and you reassign partitions on the
> sixth day then those partitions will remain on the replicas for another
> seven days.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1379) Partition reassignment resets clock for time-based retention

2017-06-27 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065471#comment-16065471
 ] 

Andrew Olson commented on KAFKA-1379:
-

[~hachikuji] Jason, could you confirm if this bug has been fixed? According to 
http://kafka.apache.org/documentation.html#upgrade_10_1_breaking it appears so.

> Partition reassignment resets clock for time-based retention
> 
>
> Key: KAFKA-1379
> URL: https://issues.apache.org/jira/browse/KAFKA-1379
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Joel Koshy
>
> Since retention is driven off mod-times reassigned partitions will result in
> data that has been on a leader to be retained for another full retention
> cycle. E.g., if retention is seven days and you reassign partitions on the
> sixth day then those partitions will remain on the replicas for another
> seven days.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)