[jira] [Resolved] (KAFKA-5299) MirrorMaker with New.consumer doesn't consume message from multiple topics whitelisted

2017-06-29 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman resolved KAFKA-5299.

Resolution: Cannot Reproduce

> MirrorMaker with New.consumer doesn't consume message from multiple topics 
> whitelisted 
> ---
>
> Key: KAFKA-5299
> URL: https://issues.apache.org/jira/browse/KAFKA-5299
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jyoti
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5381) ERROR Uncaught exception in scheduled task 'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)

2017-06-06 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-5381:
---
Description: 
We have a 6 node cluster of 0.10.0.1 brokers. Broker 4 had a hardware problem, 
so we re-assigned all its partitions to other brokers. We immediately started 
observing the error described in KAFKA-4362 from several of our consumers.

However, on broker 6, we also started seeing the following exceptions in 
{{KafkaScheduler}} which have a somewhat similar-looking traceback:
{code}
[2017-06-03 17:23:57,926] ERROR Uncaught exception in scheduled task 
'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)
java.lang.IllegalArgumentException: Message format version for partition 50 not 
found
at 
kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
at scala.Option.getOrElse(Option.scala:121)
at 
kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2$$anonfun$10.apply(GroupMetadataManager.scala:560)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2$$anonfun$10.apply(GroupMetadataManager.scala:551)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply$mcI$sp(GroupMetadataManager.scala:551)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply(GroupMetadataManager.scala:543)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply(GroupMetadataManager.scala:543)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:239)
at 
kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$deleteExpiredOffsets(GroupMetadataManager.scala:543)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$1.apply$mcV$sp(GroupMetadataManager.scala:87)
at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
{code}

Unsurprisingly, the error disappeared once {{offsets.retention.minutes}} passed.

This appears to be similar root cause to KAFKA-4362 where 
{{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} is throwing the 
error due to the offset partition being moved, but I'm unclear whether the fix 
for that version also fixed the {{KafkaScheduler}} or if more work needs to be 
done here.

We did the partition re-assignment by using the 
{{kafka-reassign-partitions.sh}} script and giving it the five healthy brokers. 
From my understanding, this would have randomly re-assigned all partitions (I 
don't think its sticky), so probably at least one partition from the 
{{__consumer_offsets}} topic was removed from broker 6. However, if that was 
the case, I would have expected all brokers to have had these partitions 
removed and be throwing this error. But our logging infrastructure shows that 
this error was only happening on broker 6, not on the other brokers. Not sure 
why that is.

  was:
We have a 6 node cluster of 0.10.0.1 brokers. Broker 4 had a hardware problem, 
so we re-assigned all its partitions to other brokers. We immediately started 
observing the error described in KAFKA-4362 from several of our consumers.

However, on broker 6, we also started seeing the following exceptions in 
{{KafkaScheduler}} which have a somewhat similar-looking traceback:
{code}
[2017-06-03 17:23:57,926] ERROR Uncaught exception in scheduled task 
'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)
java.lang.IllegalArgumentException: Message format version for partition 50 not 
found
at 

[jira] [Updated] (KAFKA-5381) ERROR Uncaught exception in scheduled task 'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)

2017-06-05 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-5381:
---
Description: 
We have a 6 node cluster of 0.10.0.1 brokers. Broker 4 had a hardware problem, 
so we re-assigned all its partitions to other brokers. We immediately started 
observing the error described in KAFKA-4362 from several of our consumers.

However, on broker 6, we also started seeing the following exceptions in 
{{KafkaScheduler}} which have a somewhat similar-looking traceback:
{code}
[2017-06-03 17:23:57,926] ERROR Uncaught exception in scheduled task 
'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)
java.lang.IllegalArgumentException: Message format version for partition 50 not 
found
at 
kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
at scala.Option.getOrElse(Option.scala:121)
at 
kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2$$anonfun$10.apply(GroupMetadataManager.scala:560)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2$$anonfun$10.apply(GroupMetadataManager.scala:551)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply$mcI$sp(GroupMetadataManager.scala:551)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply(GroupMetadataManager.scala:543)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply(GroupMetadataManager.scala:543)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:239)
at 
kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$deleteExpiredOffsets(GroupMetadataManager.scala:543)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$1.apply$mcV$sp(GroupMetadataManager.scala:87)
at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
{code}

Unsurprisingly, the error disappeared once {{offsets.retention.minutes}} passed.

This appears to be similar root cause to KAFKA-4362 where 
{{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} is throwing the 
error due to the offset partition being moved, but I'm unclear whether the fix 
for that version also fixed the {{KafkaScheduler}} or if more work needs to be 
done here.

When the partition re-assignment was done, I do not know how this was executed 
by our Ops team. I suspect they used the {{kafka-reassign-partitions.sh}} 
script and just gave it five brokers rather than six. From my understanding, 
this would have randomly re-assigned all partitions (I don't think its sticky), 
so probably at least one partition from the {{__consumer_offsets}} topic was 
removed from broker 6. However, if that was the case, I would have expected all 
brokers to have had these partitions removed and be throwing this error. But 
our logging infrastructure shows that this error was only happening on broker 
6, not on the other brokers. Not sure why that is.

  was:
We have a 6 node cluster of 0.10.0.1 brokers. Broker 4 had a hardware problem, 
so we re-assigned all its partitions to other brokers. We immediately started 
observing the error described in KAFKA-4362 from several of our consumers.

However, on broker 6, we also started seeing the following exceptions in 
{{KafkaScheduler}} which have a somewhat similar-looking traceback:
{code}
[2017-06-03 17:23:57,926] ERROR Uncaught exception in scheduled task 
'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)
java.lang.IllegalArgumentException: Message format version 

[jira] [Updated] (KAFKA-5381) ERROR Uncaught exception in scheduled task 'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)

2017-06-05 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-5381:
---
Description: 
We have a 6 node cluster of 0.10.0.1 brokers. Broker 4 had a hardware problem, 
so we re-assigned all its partitions to other brokers. We immediately started 
observing the error described in KAFKA-4362 from several of our consumers.

However, on broker 6, we also started seeing the following exceptions in 
{{KafkaScheduler}} which have a somewhat similar-looking traceback:
{code}
[2017-06-03 17:23:57,926] ERROR Uncaught exception in scheduled task 
'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)
java.lang.IllegalArgumentException: Message format version for partition 50 not 
found
at 
kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
at scala.Option.getOrElse(Option.scala:121)
at 
kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2$$anonfun$10.apply(GroupMetadataManager.scala:560)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2$$anonfun$10.apply(GroupMetadataManager.scala:551)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply$mcI$sp(GroupMetadataManager.scala:551)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply(GroupMetadataManager.scala:543)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$2.apply(GroupMetadataManager.scala:543)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:239)
at 
kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$deleteExpiredOffsets(GroupMetadataManager.scala:543)
at 
kafka.coordinator.GroupMetadataManager$$anonfun$1.apply$mcV$sp(GroupMetadataManager.scala:87)
at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
{code}

This appears to be similar root cause to KAFKA-4362 where 
{{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} is throwing the 
error due to the offset partition being moved, but I'm unclear whether the fix 
for that version also fixed the {{KafkaScheduler}} or if more work needs to be 
done here.

When the partition re-assignment was done, I do not know how this was executed 
by our Ops team. I suspect they used the {{kafka-reassign-partitions.sh}} 
script and just gave it five brokers rather than six. From my understanding, 
this would have randomly re-assigned all partitions (I don't think its sticky), 
so probably at least one partition from the {{__consumer_offsets}} topic was 
removed from broker 6. However, if that was the case, I would have expected all 
brokers to have had these partitions removed and be throwing this error. But 
our logging infrastructure shows that this error was only happening on broker 
6, not on the other brokers. Not sure why that is.

  was:
We have a 6 node cluster of 0.10.0.1 brokers. Broker 4 had a hardware problem, 
so we re-assigned all its partitions to other brokers. We immediately started 
observing the error described in KAFKA-4362 from several of our consumers.

However, on broker 6, we also started seeing the following exceptions in 
{{KafkaScheduler}} which have a somewhat similar-looking traceback:
{code}
[2017-06-03 17:23:57,926] ERROR Uncaught exception in scheduled task 
'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)
java.lang.IllegalArgumentException: Message format version for partition 50 not 
found
at 

[jira] [Commented] (KAFKA-3356) Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11

2017-05-31 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030804#comment-16030804
 ] 

Jeff Widman commented on KAFKA-3356:


Is there anyway to get this to land today?

This is literally just deleting a shell script and a scala file, and there's 
already a PR ready to go.

Normally I wouldn't care if this were an internal cleanup, but since it exposes 
the shell script, I routinely get questions from folks who don't realize they 
shouldn't be using it. 

So I'd rather it get cleaned up as it improves the new user experience.

> Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11
> 
>
> Key: KAFKA-3356
> URL: https://issues.apache.org/jira/browse/KAFKA-3356
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0
>Reporter: Ashish Singh
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 0.12.0.0
>
>
> ConsumerOffsetChecker is marked deprecated as of 0.9, should be removed in 
> 0.11.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2017-05-25 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024997#comment-16024997
 ] 

Jeff Widman commented on KAFKA-4362:


Updated the applicable version as we just encountered this on a {{0.10.0.1}} 
cluster.

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1, 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
> Fix For: 0.10.1.1
>
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2017-05-25 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-4362:
---
Affects Version/s: 0.10.0.1

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1, 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
> Fix For: 0.10.1.1
>
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3356) Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11

2017-05-15 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010801#comment-16010801
 ] 

Jeff Widman commented on KAFKA-3356:


Personally, I don't think we need feature parity for the old-consumers since 
those are in the process of being deprecated/removed.

> Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11
> 
>
> Key: KAFKA-3356
> URL: https://issues.apache.org/jira/browse/KAFKA-3356
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0
>Reporter: Ashish Singh
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 0.11.0.0
>
>
> ConsumerOffsetChecker is marked deprecated as of 0.9, should be removed in 
> 0.11.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2017-04-19 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975603#comment-15975603
 ] 

Jeff Widman commented on KAFKA-3042:


[~lindong] thanks for the offer to help and sorry for the slow response. 

I'm not exactly sure how to repro, but below I copied a sanitized version of 
our internal wiki page documenting our findings as we tried to figure out what 
was happening and how we got into the state of mis-matched controller epoch for 
controller vs random partition. It's not the most polished, more of a train of 
thought put to paper as we debugged.

Reading through it, it appeared that broker 3 lost connection to zookeeper, 
then when it came back, it elected itself controller, but somehow ended up in a 
state where the broker 3 controller had a list of brokers that was completely 
empty. This doesn't make logical sense because if a broker is controller, then 
it should list itself in active brokers. But somehow it happened. Then 
following that, the active epoch for the controller is 134, but the active 
epoch listed by a random partition in zookeeper is 133. So that created the 
version mismatch. 

More details below, and I also have access to the detailed Kafka logs (but not 
ZK logs) beyond just the snippets if you need anything else. They will get 
rotated out of elasticsearch within a few months and disappear, so hopefully we 
can get to the bottom of this before that.


{code}
3 node cluster. 
Broker 1 is controller.
Zookeeper GC pause meant that broker 3 lost connection. 
When it came back, broker 3 thought it was controller, but thought there were 
no alive brokers--see the empty set referenced in the logs below. This alone 
seems incorrect because if a broker is a controller, you'd think it would 
include itself in the set.


See the following in the logs:


[2017-03-17 21:32:15,812] ERROR Controller 3 epoch 134 initiated state change 
for partition [topic_name,626] from OfflinePartition to OnlinePartition failed 
(s
tate.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition 
[topic_name,626] is alive. Live brokers are: [Set()], Assigned replicas are: 
[List(1, 3)]
at 
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75)
at 
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:345)
at 
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:205)
at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at 
kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
at 
kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:335)
at 
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:166)
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

Looking at the code + error message, the controller is unaware of active 
brokers. However, there are assigned replicas. We checked the log files under 
/data/kafka and they had m_times greater than the exception timestamp, plus our 
producers and consumers seemed to be working, so the cluster is successfully 
passing data around. The controller situation is just screwed up.


[2017-03-17 21:32:43,976] ERROR Controller 3 epoch 134 initiated state 
change for partition 

[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2017-04-06 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959479#comment-15959479
 ] 

Jeff Widman edited comment on KAFKA-3042 at 4/6/17 6:16 PM:


We hit this on 0.10.0.1. 

Root cause was a really long zookeeper GC pause that caused the brokers to lose 
their connection. Producers / Consumers were working successfully as they'd 
established their connections to the brokers before the zk issue, so they kept 
happily working. But the broker logs were throwing these warnings about cached 
zkVersion not matching. And anything that required controller was broken, for 
example any newly created partitions didn't have leaders. 

I don't know if this error message is thrown whenever two znodes don't match, 
but in our case the ZK GC pause had resulted in a race condition sequence where 
somehow the epoch of /controller znode did not match the partition controller 
epoch under /brokers znode. I'm not sure if it's possible to fix this, perhaps 
with the ZK multi-command where updates are transactional.

It took us a while to realize that was what the log message meant, so the log 
message could be made more specific to report exactly which znode paths don't 
match in zookeper.

For us, forcing a controller re-election by deleting the /controller znode 
immediately fixed the issue without having to restart brokers. 


was (Author: jeffwidman):
We hit this on 0.10.0.1. 

Root cause was a really long zookeeper GC pause that caused the brokers to lose 
their connection. Producers / Consumers were working successfully as they'd 
established their connections to the brokers before the zk issue, so they kept 
happily working. But the broker logs were throwing these warnings about cached 
zkVersion not matching. And anything that required controller was broken, for 
example any newly created partitions didn't have leaders. 

I think this log message could be made more specific to show which znodes don't 
match.

I don't know if this error message is thrown whenever two znodes don't match, 
but in our case the ZK GC pause resulted in a race condition sequence where 
somehow the epoch of /controller znode did not match the partition controller 
epoch under /brokers znode. I'm not sure if it's possible to fix this, perhaps 
with the ZK multi-command where updates are transactional.

It took us a while to realize that was what the log message meant, so the log 
message could be made more specific to report exactly which znode paths don't 
match in zookeper.

For us, forcing a controller re-election by deleting the /controller znode 
immediately fixed the issue without having to restart brokers. 

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>  Labels: reliability
> Fix For: 0.11.0.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2017-04-06 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959479#comment-15959479
 ] 

Jeff Widman commented on KAFKA-3042:


We hit this on 0.10.0.1. 

Root cause was a really long zookeeper GC pause that caused the brokers to lose 
their connection. Producers / Consumers were working successfully as they'd 
established their connections to the brokers before the zk issue, so they kept 
happily working. But the broker logs were throwing these warnings about cached 
zkVersion not matching. And anything that required controller was broken, for 
example any newly created partitions didn't have leaders. 

I think this log message could be made more specific to show which znodes don't 
match.

I don't know if this error message is thrown whenever two znodes don't match, 
but in our case the ZK GC pause resulted in a race condition sequence where 
somehow the epoch of /controller znode did not match the partition controller 
epoch under /brokers znode. I'm not sure if it's possible to fix this, perhaps 
with the ZK multi-command where updates are transactional.

It took us a while to realize that was what the log message meant, so the log 
message could be made more specific to report exactly which znode paths don't 
match in zookeper.

For us, forcing a controller re-election by deleting the /controller znode 
immediately fixed the issue without having to restart brokers. 

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>  Labels: reliability
> Fix For: 0.11.0.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-16 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929421#comment-15929421
 ] 

Jeff Widman commented on KAFKA-4858:


I'd say if you patch it so that replication doesn't break, that's good enough 
for me. I don't see the point in spending a lot of time fixing support for old 
versions beyond that.

The better fix will be to update shell scripts etc to use the broker-side API 
so all broker-side checks are hit before anything else happens or anything is 
created in Zookeeper. In that case, so long as the CreateTopic api throws a 
descriptive error, then I think the user experience is fine.

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
>   at 

[jira] [Commented] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-10 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905917#comment-15905917
 ] 

Jeff Widman commented on KAFKA-4858:


I saw the PR ignores those topics... is this a silent failure?

Like will users be confused why their topic creation appears to succeed in the 
shell script but fail in the broker?

Not sure there's a good solution if someone is using 0.9 shell script, but for 
the CreateTopic API I hope it throws a clear error message if topic creation 
fails due to this new broker-side check.

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> 

[jira] [Commented] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-09 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903928#comment-15903928
 ] 

Jeff Widman commented on KAFKA-4858:


My understanding was that the general expectation was that you should only use 
Kafka shell scripts whose version matches the broker version because many (all) 
of them use internal Java/scala code that may break between releases. So 
normally I'd say this isn't worth worrying about fixing.

However, adding a check for this on the broker would be useful because couldn't 
this be hit when calling the new CreateTopic API call? 

I could easily see this happening in a third-party/non-java client, if they 
didn't realize they needed to add a check for this before sending the API call.

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
>   at 

[jira] [Updated] (KAFKA-2273) KIP-54: Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-03-06 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-2273:
---
Fix Version/s: 0.11.0.0

> KIP-54: Add rebalance with a minimal number of reassignments to 
> server-defined strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2273) KIP-54: Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-03-06 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898678#comment-15898678
 ] 

Jeff Widman commented on KAFKA-2273:


This KIP was accepted, so I added it to the 0.11 milestone

> KIP-54: Add rebalance with a minimal number of reassignments to 
> server-defined strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4790) Kafka cannot recover after a disk full

2017-03-03 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-4790:
---
Fix Version/s: (was: 0.10.2.1)

> Kafka cannot recover after a disk full
> --
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1
>Reporter: Pengwei
>  Labels: reliability
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4677) Avoid unnecessary task movement across threads during rebalance

2017-03-01 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890935#comment-15890935
 ] 

Jeff Widman commented on KAFKA-4677:


Is KIP-54 related since it also implements a sticky partition assigner, albeit 
within the Kafka consumer? 

I haven't worked directly with streams, but I thought they piggy-backed on top 
of KafKa consumer. If so, wouldn't it be better to just inherit the KIP-54 
implementation?

Again, I don't really understand internals, so not sure.

> Avoid unnecessary task movement across threads during rebalance
> ---
>
> Key: KAFKA-4677
> URL: https://issues.apache.org/jira/browse/KAFKA-4677
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.3.0
>
>
> StreamPartitionAssigner tries to follow a sticky assignment policy to avoid 
> expensive task migration. Currently, it does this in a best-effort approach.
> We could observe a case, for which tasks did migrate for no good reason, thus 
> we assume that the current implementation could be improved to be more sticky.
> The concrete scenario is as follows:
> assume we have topology with 3 tasks, A, B, C
> assume we have 3 threads, each executing one task: 1-A, 2-B, 3-C
> for some reason, thread 1 goes down and a rebalance gets triggered
> thread 2 and 3 get their partitions revoked
> sometimes (not sure what the exact condition for this is), the new assignment 
> flips the assignment for task B and C (task A is newly assigned to either 
> thread 2 or 3)
> > possible new assignment 2(A,C) and 3-B
> There is no obvious reason (like load-balancing) why the task assignment for 
> B and C does change to the other thread resulting in unnecessary task 
> migration.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4095) When a topic is deleted and then created with the same name, 'committed' offsets are not reset

2017-02-28 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889093#comment-15889093
 ] 

Jeff Widman commented on KAFKA-4095:


Wasn't this solved by KAFKA-2000?

At least for the new consumers which store their offsets in Kafka... if they 
store offsets in zookeeper that isn't updated, but that's been deprecated since 
Kafka 0.8.2 so probably not worth fixing at this point.

> When a topic is deleted and then created with the same name, 'committed' 
> offsets are not reset
> --
>
> Key: KAFKA-4095
> URL: https://issues.apache.org/jira/browse/KAFKA-4095
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Alex Glikson
>Assignee: Vahid Hashemian
>
> I encountered a very strange behavior of Kafka, which seems to be a bug.
> After deleting a topic and re-creating it with the same name, I produced 
> certain amount of new messages, and then opened a consumer with the same ID 
> that I used before re-creating the topic (with auto.commit=false, 
> auto.offset.reset=earliest). While the latest offsets seemed up to date, the 
> *committed* offset (returned by committed() method) was an *old* offset, from 
> the time before the topic has been deleted and created.
> I would have assumed that when a topic is deleted, all the associated 
> topic-partitions and consumer groups are recycled too.
> I am using the Java client version 0.9, with Kafka server 0.10.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes

2017-01-25 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838918#comment-15838918
 ] 

Jeff Widman commented on KAFKA-3806:


Too often people not super familiar with Kafka don't realize the default is so 
low... 

So I would rather see this default to 4 days. If all processes in a consumer 
group randomly blow out Friday late afternoon and it's a low-priority consumer 
group, so ops/dev team decides to wait til Monday to fix the problem, thinking 
that the consumer will just catch back up when they get it fixed, then they'll 
be surprised to learn their offsets were hosed. 

For those companies big enough to have a performance hit from maintaining 
offsets that long, they will generally have the in-house resources to realize 
they should reduce this value.

> Adjust default values of log.retention.hours and offsets.retention.minutes
> --
>
> Key: KAFKA-3806
> URL: https://issues.apache.org/jira/browse/KAFKA-3806
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Michal Turek
>Priority: Minor
>
> Combination of default values of log.retention.hours (168 hours = 7 days) and 
> offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special 
> cases. Offset retention should be always greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, 
> topic wasn't deleted.
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no 
> more incoming data and there was nothing to confirm. (We have auto-commit 
> disabled, I'm not sure how behaves enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to 
> offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it 
> didn't find any committed offsets for that topic since they were deleted by 
> offsets.retention.minutes so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 
> days of messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, 
> don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice 
> larger than log.retention.hours.
> - Check these values during Kafka startup and log a warning if 
> offsets.retention.minutes is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets 
> in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4517) Remove kafka-consumer-offset-checker.sh script since already deprecated in Kafka 9

2017-01-24 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836818#comment-15836818
 ] 

Jeff Widman commented on KAFKA-4517:


So what are the next actions here since I've got an outstanding PR for this and 
it's the only PR against any of these issues...?

Should I retitle the PR / commit to the proper ticket? 

And also remove the tool itself in the same commit or a second commit?

> Remove kafka-consumer-offset-checker.sh script since already deprecated in 
> Kafka 9
> --
>
> Key: KAFKA-4517
> URL: https://issues.apache.org/jira/browse/KAFKA-4517
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 0.10.1.0, 0.10.0.0, 0.10.0.1
>Reporter: Jeff Widman
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
>
> Kafka 9 deprecated kafka-consumer-offset-checker.sh 
> (kafka.tools.ConsumerOffsetChecker) in favor of kafka-consumer-groups.sh 
> (kafka.admin.ConsumerGroupCommand). 
> Since this was deprecated in 9, and the full functionality of the old script 
> appears to be available in the new script, can we remove the old shell script 
> in 10? 
> From an Ops perspective, it's confusing when I'm trying to check consumer 
> offsets that I open the bin directory, and see a script that seems to do 
> exactly what I want, only to later discover that I'm not supposed to use it. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-01-20 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832697#comment-15832697
 ] 

Jeff Widman edited comment on KAFKA-4682 at 1/21/17 1:08 AM:
-

Now that consumers have background heartbeat thread, it should be much easier 
to identify when consumer dies vs alive. So this makes sense to me. However, 
this would make KAFKA-2000 more important because you can't count on offsets 
expiring.

We also had a production problem where a couple of topics log files were 
totally cleared, but the offsets weren't cleared, so we had negative lag where 
consumer offset was higher than broker highwater. This was with zookeeper 
offset storage, but regardless I could envision something getting screwed up or 
someone resetting a cluster w/o understanding what they're doing and making 
offsets screwed up. If this was implemented those old offsets would never go 
away unless manually cleared up also. So I'd want to make sure that's protected 
against somehow... like if a broker ever encounters consumer offset that's 
higher than highwater mark, either an exception is thrown or those consumer 
offsets get reset to the broker highwater mark. Probably safest to just throw 
an exception in case something else funky is going on.


was (Author: jeffwidman):
Now that consumers have background heartbeat thread, it should be much easier 
to identify when consumer dies vs alive. So this makes sense to me. However, 
this would make KAFKA-2000 more important because you can't count on offsets 
expiring.

We also had a production problem where a couple of topics log files were 
totally cleared, but the offsets weren't cleared, so we had negative lag where 
consumer offset was higher than broker highwater. This was with zookeeper 
offset storage, but regardless I could envision something getting screwed up or 
someone resetting a cluster w/o understanding what they're doing and making 
offsets screwed up. If this was implemented those old offsets would never go 
away unless manually cleared up also. So I'd want to make sure that's protected 
against somehow... like if a broker ever encounters consumer offset that's 
higher than highwater mark, that gets removed from the topic.

> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian 

[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-01-20 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832697#comment-15832697
 ] 

Jeff Widman commented on KAFKA-4682:


Now that consumers have background heartbeat thread, it should be much easier 
to identify when consumer dies vs alive. So this makes sense to me. However, 
this would make KAFKA-2000 more important because you can't count on offsets 
expiring.

We also had a production problem where a couple of topics log files were 
totally cleared, but the offsets weren't cleared, so we had negative lag where 
consumer offset was higher than broker highwater. This was with zookeeper 
offset storage, but regardless I could envision something getting screwed up or 
someone resetting a cluster w/o understanding what they're doing and making 
offsets screwed up. If this was implemented those old offsets would never go 
away unless manually cleared up also. So I'd want to make sure that's protected 
against somehow... like if a broker ever encounters consumer offset that's 
higher than highwater mark, that gets removed from the topic.

> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2000) Delete consumer offsets from kafka once the topic is deleted

2017-01-20 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832378#comment-15832378
 ] 

Jeff Widman commented on KAFKA-2000:


If neither of them is interested, I'm happy to cleanup the existing patch to 
get it merged into 0.10.2. The test suite at my work would benefit from this.

> Delete consumer offsets from kafka once the topic is deleted
> 
>
> Key: KAFKA-2000
> URL: https://issues.apache.org/jira/browse/KAFKA-2000
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriharsha Chintalapani
>Assignee: Manikumar Reddy
>  Labels: newbie++
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-2000_2015-05-03_10:39:11.patch, KAFKA-2000.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2000) Delete consumer offsets from kafka once the topic is deleted

2017-01-20 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832378#comment-15832378
 ] 

Jeff Widman edited comment on KAFKA-2000 at 1/20/17 8:45 PM:
-

If neither of them is interested, I'm happy to cleanup the existing patch to 
get it merged into 0.10.2. The test suite at my work would benefit from this. 
Just let me know.


was (Author: jeffwidman):
If neither of them is interested, I'm happy to cleanup the existing patch to 
get it merged into 0.10.2. The test suite at my work would benefit from this.

> Delete consumer offsets from kafka once the topic is deleted
> 
>
> Key: KAFKA-2000
> URL: https://issues.apache.org/jira/browse/KAFKA-2000
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriharsha Chintalapani
>Assignee: Manikumar Reddy
>  Labels: newbie++
> Fix For: 0.10.2.0
>
> Attachments: KAFKA-2000_2015-05-03_10:39:11.patch, KAFKA-2000.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3853) Report offsets for empty groups in ConsumerGroupCommand

2017-01-18 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15828620#comment-15828620
 ] 

Jeff Widman commented on KAFKA-3853:


Thanks again for this! Really looking forward to it.

> Report offsets for empty groups in ConsumerGroupCommand
> ---
>
> Key: KAFKA-3853
> URL: https://issues.apache.org/jira/browse/KAFKA-3853
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>  Labels: kip
> Fix For: 0.10.2.0
>
>
> We ought to be able to display offsets for groups which either have no active 
> members or which are not using group management. The owner column can be left 
> empty or set to "N/A". If a group is active, I'm not sure it would make sense 
> to report all offsets, in particular when partitions are unassigned, but if 
> it seems problematic to do so, we could enable the behavior with a flag (e.g. 
> --include-unassigned).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4668) Mirrormaker should default to auto.offset.reset=earliest

2017-01-18 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15827769#comment-15827769
 ] 

Jeff Widman commented on KAFKA-4668:


Also KAFKA-3848

> Mirrormaker should default to auto.offset.reset=earliest
> 
>
> Key: KAFKA-4668
> URL: https://issues.apache.org/jira/browse/KAFKA-4668
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jeff Widman
> Fix For: 0.10.3.0
>
>
> Mirrormaker currently inherits the default value for auto.offset.reset, which 
> is latest (new consumer) / largest (old consumer). 
> While for most consumers this is a sensible default, mirrormakers are 
> specifically designed for replication, so they should default to replicating 
> topics from the beginning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4668) Mirrormaker should default to auto.offset.reset=earliest

2017-01-17 Thread Jeff Widman (JIRA)
Jeff Widman created KAFKA-4668:
--

 Summary: Mirrormaker should default to auto.offset.reset=earliest
 Key: KAFKA-4668
 URL: https://issues.apache.org/jira/browse/KAFKA-4668
 Project: Kafka
  Issue Type: Improvement
Reporter: Jeff Widman
 Fix For: 0.10.3.0


Mirrormaker currently inherits the default value for auto.offset.reset, which 
is latest (new consumer) / largest (old consumer). 

While for most consumers this is a sensible default, mirrormakers are 
specifically designed for replication, so they should default to replicating 
topics from the beginning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2410) Implement "Auto Topic Creation" client side and remove support from Broker side

2017-01-16 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824382#comment-15824382
 ] 

Jeff Widman commented on KAFKA-2410:


I strongly support removing it from the Broker.

But does it need to be added to the Consumer? Why not only add it to the 
Producer? 

For the consumer, as long as it can subscribe to a non-existent topic name and 
then be notified once the topic is created by producing to it, there's no need 
to actually create a topic just by consuming from it. I think something similar 
to this behavior exists currently with the regex pattern subscription.  

> Implement "Auto Topic Creation" client side and remove support from Broker 
> side
> ---
>
> Key: KAFKA-2410
> URL: https://issues.apache.org/jira/browse/KAFKA-2410
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.8.2.1
>Reporter: Grant Henke
>Assignee: Grant Henke
>
> Auto topic creation on the broker has caused pain in the past; And today it 
> still causes unusual error handling requirements on the client side, added 
> complexity in the broker, mixed responsibility of the TopicMetadataRequest, 
> and limits configuration of the option to be cluster wide. In the future 
> having it broker side will also make features such as authorization very 
> difficult. 
> There have been discussions in the past of implementing this feature client 
> side. 
> [example|https://issues.apache.org/jira/browse/KAFKA-689?focusedCommentId=13548746=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13548746]
> This Jira is to track that discussion and implementation once the necessary 
> protocol support exists: KAFKA-2229



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4621) Fetch Request V3 docs list max_bytes twice

2017-01-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15819844#comment-15819844
 ] 

Jeff Widman commented on KAFKA-4621:


Yeah, I saw that... perhaps the descriptions could also be cleaned up slightly 
to make it more clear what they're referring to... I'll try to put together a 
PR at some point here... 

> Fetch Request V3 docs list max_bytes twice
> --
>
> Key: KAFKA-4621
> URL: https://issues.apache.org/jira/browse/KAFKA-4621
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.10.2.0
>Reporter: Jeff Widman
>Priority: Minor
>
> http://kafka.apache.org/protocol.html
> Fetch Request (Version: 3)  lists "max_bytes" twice, but with different 
> descriptions. This is confusing, as it's not apparent if this is an 
> accidental mistake or a purposeful inclusion... if purposeful, it's not clear 
> why.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4621) Fetch Request V3 docs list max_bytes twice

2017-01-11 Thread Jeff Widman (JIRA)
Jeff Widman created KAFKA-4621:
--

 Summary: Fetch Request V3 docs list max_bytes twice
 Key: KAFKA-4621
 URL: https://issues.apache.org/jira/browse/KAFKA-4621
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 0.10.2.0
Reporter: Jeff Widman
Priority: Minor


http://kafka.apache.org/protocol.html

Fetch Request (Version: 3)  lists "max_bytes" twice, but with different 
descriptions. This is confusing, as it's not apparent if this is an accidental 
mistake or a purposeful inclusion... if purposeful, it's not clear why.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1817) AdminUtils.createTopic vs kafka-topics.sh --create with partitions

2017-01-05 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803174#comment-15803174
 ] 

Jeff Widman commented on KAFKA-1817:


So can this be closed?

> AdminUtils.createTopic vs kafka-topics.sh --create with partitions
> --
>
> Key: KAFKA-1817
> URL: https://issues.apache.org/jira/browse/KAFKA-1817
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.8.2.0
> Environment: debian linux current version  up to date
>Reporter: Jason Kania
>
> When topics are created using AdminUtils.createTopic in code, no partitions 
> folder is created The zookeeper shell shows this.
> ls /brokers/topics/foshizzle
> []
> However, when kafka-topics.sh --create is run, the partitions folder is 
> created:
> ls /brokers/topics/foshizzle
> [partitions]
> The unfortunately useless error message "KeeperErrorCode = NoNode for 
> /brokers/topics/periodicReading/partitions" makes it unclear what to do. When 
> the topics are listed via kafka-topics.sh, they appear to have been created 
> fine. It would be good if the exception was wrapped by Kafka to suggested 
> looking in the zookeeper shell so a person didn't have to dig around to 
> understand what the meaning of this path is...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3135) Unexpected delay before fetch response transmission

2016-12-30 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15788686#comment-15788686
 ] 

Jeff Widman commented on KAFKA-3135:


It's not currently a critical issue for my company. Typically when we're 
considering upgrading we look at outstanding bugs to evaluate whether to 
upgrade or wait, so I just wanted the tags to be corrected. Thanks [~ewencp] 
for handling.


> Unexpected delay before fetch response transmission
> ---
>
> Key: KAFKA-3135
> URL: https://issues.apache.org/jira/browse/KAFKA-3135
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.2.0
>
>
> From the user list, Krzysztof Ciesielski reports the following:
> {quote}
> Scenario description:
> First, a producer writes 50 elements into a topic
> Then, a consumer starts to read, polling in a loop.
> When "max.partition.fetch.bytes" is set to a relatively small value, each
> "consumer.poll()" returns a batch of messages.
> If this value is left as default, the output tends to look like this:
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> As we can see, there are weird "gaps" when poll returns 0 elements for some
> time. What is the reason for that? Maybe there are some good practices
> about setting "max.partition.fetch.bytes" which I don't follow?
> {quote}
> The gist to reproduce this problem is here: 
> https://gist.github.com/kciesielski/054bb4359a318aa17561.
> After some initial investigation, the delay appears to be in the server's 
> networking layer. Basically I see a delay of 5 seconds from the time that 
> Selector.send() is invoked in SocketServer.Processor with the fetch response 
> to the time that the send is completed. Using netstat in the middle of the 
> delay shows the following output:
> {code}
> tcp4   0  0  10.191.0.30.55455  10.191.0.30.9092   ESTABLISHED
> tcp4   0 102400  10.191.0.30.9092   10.191.0.30.55454  ESTABLISHED
> {code}
> From this, it looks like the data reaches the send buffer, but needs to be 
> flushed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2016-12-29 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15786986#comment-15786986
 ] 

Jeff Widman commented on KAFKA-3297:


[~ewencp] KIP-54 tackles both sticky re-assignments and a more "fair" initial 
assignment. Search the wiki page for "fair yet sticky" for a section that 
provides a bit more context.

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Andrew Olson
>Assignee: Andrew Olson
> Fix For: 0.10.2.0
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2273) Add rebalance with a minimal number of reassignments to server-defined strategy list

2016-12-21 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768710#comment-15768710
 ] 

Jeff Widman commented on KAFKA-2273:


What was the outcome of the vote?

> Add rebalance with a minimal number of reassignments to server-defined 
> strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: newbie++, newbiee
> Fix For: 0.10.2.0
>
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2331) Kafka does not spread partitions in a topic among all consumers evenly

2016-12-21 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766509#comment-15766509
 ] 

Jeff Widman commented on KAFKA-2331:


Isn't this what round robin partitioning strategy was trying to solve?

If so, this issue should be closed.

> Kafka does not spread partitions in a topic among all consumers evenly
> --
>
> Key: KAFKA-2331
> URL: https://issues.apache.org/jira/browse/KAFKA-2331
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1.1
>Reporter: Stefan Miklosovic
>
> I want to have 1 topic with 10 partitions. I am using default configuration 
> of Kafka. I create 1 topic with 10 partitions by that helper script and now I 
> am about to produce messages to it.
> The thing is that even all partitions are indeed consumed, some consumers 
> have more then 1 partition assigned even I have number of consumer threads 
> equal to partitions in a topic hence some threads are idle.
> Let's describe it in more detail.
> I know that common stuff that you need one consumer thread per partition. I 
> want to be able to commit offsets per partition and this is possible only 
> when I have 1 thread per consumer connector per partition (I am using high 
> level consumer).
> So I create 10 threads, in each thread I am calling 
> Consumer.createJavaConsumerConnector() where I am doing this
> topicCountMap.put("mytopic", 1);
> and in the end I have 1 iterator which consumes messages from 1 partition.
> When I do this 10 times, I have 10 consumers, consumer per thread per 
> partition where I can commit offsets independently per partition because if I 
> put different number from 1 in topic map, I would end up with more then 1 
> consumer thread for that topic for given consumer instance so if I am about 
> to commit offsets with created consumer instance, it would commit them for 
> all threads which is not desired.
> But the thing is that when I use consumers, only 7 consumers are involved and 
> it seems that other consumer threads are idle but I do not know why.
> The thing is that I am creating these consumer threads in a loop. So I start 
> first thread (submit to executor service), then another, then another and so 
> on.
> So the scenario is that first consumer gets all 10 partitions, then 2nd 
> connects so it is splits between these two to 5 and 5 (or something similar), 
> then other threads are connecting.
> I understand this as a partition rebalancing among all consumers so it 
> behaves well in such sense that if more consumers are being created, 
> partition rebalancing occurs between these consumers so every consumer should 
> have some partitions to operate upon.
> But from the results I see that there is only 7 consumers and according to 
> consumed messages it seems they are split like 3,2,1,1,1,1,1 partition-wise. 
> Yes, these 7 consumers covered all 10 partitions, but why consumers with more 
> then 1 partition do no split and give partitions to remaining 3 consumers?
> I am pretty much wondering what is happening with remaining 3 threads and why 
> they do not "grab" partitions from consumers which have more then 1 partition 
> assigned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2019) RoundRobinAssignor clusters by consumer

2016-12-21 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766498#comment-15766498
 ] 

Jeff Widman commented on KAFKA-2019:


As noted in KIP-54 
(https://cwiki.apache.org/confluence/display/pages/viewpage.action?pageId=62692483)
 this is not relevant to the new consumer.

Could someone update the issue title to make it clear it only applies to the 
old consumer?

I also suspect this may never get merged as the new consumer is the future, in 
which case it'd be nice if this were closed as "wontfix"


> RoundRobinAssignor clusters by consumer
> ---
>
> Key: KAFKA-2019
> URL: https://issues.apache.org/jira/browse/KAFKA-2019
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Joseph Holsten
>Assignee: Neha Narkhede
>Priority: Minor
> Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
> KAFKA-2019.patch
>
>
> When rolling out a change today, I noticed that some of my consumers are 
> "greedy", taking far more partitions than others.
> The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
> sorted by toString, which is {{ "%s-%d".format(consumer, threadId)}}. This 
> causes each consumer's threads to be adjacent to each other.
> One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
> that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2016-12-21 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766465#comment-15766465
 ] 

Jeff Widman edited comment on KAFKA-3297 at 12/21/16 8:38 AM:
--

Is this being superceded by KIP-54? 
https://cwiki.apache.org/confluence/display/pages/viewpage.action?pageId=62692483


was (Author: jeffwidman):
Was this KIP ever voted on? I see there's only a handful of messages about it, 
one of which mentions patching the round robin implementation to avoid 
"clumping" partitions from the same topic onto the same consumer. 

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
> Fix For: 0.10.2.0
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2016-12-21 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766465#comment-15766465
 ] 

Jeff Widman commented on KAFKA-3297:


Was this KIP ever voted on? I see there's only a handful of messages about it, 
one of which mentions patching the round robin implementation to avoid 
"clumping" partitions from the same topic onto the same consumer. 

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Olson
>Assignee: Andrew Olson
> Fix For: 0.10.2.0
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2172) Round-robin partition assignment strategy too restrictive

2016-12-21 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766445#comment-15766445
 ] 

Jeff Widman commented on KAFKA-2172:


Should this be marked as resolved?

KAFKA-2196 added this to the new consumer quite a while ago. 

KAFKA-2434 has a patch available but appears dead in the water, so if it's not 
getting merged then there's nothing more to do on this ticket.

> Round-robin partition assignment strategy too restrictive
> -
>
> Key: KAFKA-2172
> URL: https://issues.apache.org/jira/browse/KAFKA-2172
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>
> The round-ropin partition assignment strategy, was introduced for the 
> high-level consumer, starting with 0.8.2.1.  This appears to be a very 
> attractive feature, but it has an unfortunate restriction, which prevents it 
> from being easily utilized.  That is that it requires all consumers in the 
> consumer group have identical topic regex selectors, and that they have the 
> same number of consumer threads.
> It turns out this is not always the case for our deployments.  It's not 
> unusual to run multiple consumers within a single process (with different 
> topic selectors), or we might have multiple processes dedicated for different 
> topic subsets.  Agreed, we could change these to have separate group ids for 
> each sub topic selector (but unfortunately, that's easier said than done).  
> In several cases, we do at least have separate client.ids set for each 
> sub-consumer, so it would be incrementally better if we could at least loosen 
> the requirement such that each set of topics selected by a groupid/clientid 
> pair are the same.
> But, if we want to do a rolling restart for a new version of a consumer 
> config, the cluster will likely be in a state where it's not possible to have 
> a single config until the full rolling restart completes across all nodes.  
> This results in a consumer outage while the rolling restart is happening.
> Finally, it's especially problematic if we want to canary a new version for a 
> period before rolling to the whole cluster.
> I'm not sure why this restriction should exist (as it obviously does not 
> exist for the 'range' assignment strategy).  It seems it could be made to 
> work reasonably well with heterogenous topic selection and heterogenous 
> thread counts.  The documentation states that "The round-robin partition 
> assignor lays out all the available partitions and all the available consumer 
> threads. It then proceeds to do a round-robin assignment from partition to 
> consumer thread."
> If the assignor can "lay out all the available partitions and all the 
> available consumer threads", it should be able to uniformly assign partitions 
> to the available threads.  In each case, if a thread belongs to a consumer 
> that doesn't have that partition selected, just move to the next available 
> thread that does have the selection, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2196) remove roundrobin identical topic constraint in consumer coordinator

2016-12-21 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15766440#comment-15766440
 ] 

Jeff Widman commented on KAFKA-2196:


What version did this land in?

> remove roundrobin identical topic constraint in consumer coordinator
> 
>
> Key: KAFKA-2196
> URL: https://issues.apache.org/jira/browse/KAFKA-2196
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Attachments: KAFKA-2196.patch
>
>
> roundrobin doesn't need to make all consumers have identical topic 
> subscriptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-242) Subsequent calls of ConsumerConnector.createMessageStreams cause Consumer offset to be incorrect

2016-12-20 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15765835#comment-15765835
 ] 

Jeff Widman commented on KAFKA-242:
---

Does this bug still exist in 0.10?

> Subsequent calls of ConsumerConnector.createMessageStreams cause Consumer 
> offset to be incorrect
> 
>
> Key: KAFKA-242
> URL: https://issues.apache.org/jira/browse/KAFKA-242
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.7
>Reporter: David Arthur
> Attachments: kafka.log
>
>
> When calling ConsumerConnector.createMessageStreams in rapid succession, the 
> Consumer offset is incorrectly advanced causing the consumer to lose 
> messages. This seems to happen when createMessageStreams is called before the 
> rebalancing triggered by the previous call to createMessageStreams has 
> completed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3853) Report offsets for empty groups in ConsumerGroupCommand

2016-12-18 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15759588#comment-15759588
 ] 

Jeff Widman commented on KAFKA-3853:


Did the KIP get the missing vote required to pass?

> Report offsets for empty groups in ConsumerGroupCommand
> ---
>
> Key: KAFKA-3853
> URL: https://issues.apache.org/jira/browse/KAFKA-3853
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
> Fix For: 0.10.2.0
>
>
> We ought to be able to display offsets for groups which either have no active 
> members or which are not using group management. The owner column can be left 
> empty or set to "N/A". If a group is active, I'm not sure it would make sense 
> to report all offsets, in particular when partitions are unassigned, but if 
> it seems problematic to do so, we could enable the behavior with a flag (e.g. 
> --include-unassigned).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3135) Unexpected delay before fetch response transmission

2016-12-14 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15750082#comment-15750082
 ] 

Jeff Widman commented on KAFKA-3135:


Can the tags on this issue be updated to note that it applies to 0.10.x 
versions as well? 

At least, that's what it seems from reading through the issue description.

> Unexpected delay before fetch response transmission
> ---
>
> Key: KAFKA-3135
> URL: https://issues.apache.org/jira/browse/KAFKA-3135
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.2.0
>
>
> From the user list, Krzysztof Ciesielski reports the following:
> {quote}
> Scenario description:
> First, a producer writes 50 elements into a topic
> Then, a consumer starts to read, polling in a loop.
> When "max.partition.fetch.bytes" is set to a relatively small value, each
> "consumer.poll()" returns a batch of messages.
> If this value is left as default, the output tends to look like this:
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> As we can see, there are weird "gaps" when poll returns 0 elements for some
> time. What is the reason for that? Maybe there are some good practices
> about setting "max.partition.fetch.bytes" which I don't follow?
> {quote}
> The gist to reproduce this problem is here: 
> https://gist.github.com/kciesielski/054bb4359a318aa17561.
> After some initial investigation, the delay appears to be in the server's 
> networking layer. Basically I see a delay of 5 seconds from the time that 
> Selector.send() is invoked in SocketServer.Processor with the fetch response 
> to the time that the send is completed. Using netstat in the middle of the 
> delay shows the following output:
> {code}
> tcp4   0  0  10.191.0.30.55455  10.191.0.30.9092   ESTABLISHED
> tcp4   0 102400  10.191.0.30.9092   10.191.0.30.55454  ESTABLISHED
> {code}
> From this, it looks like the data reaches the send buffer, but needs to be 
> flushed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3853) Report offsets for empty groups in ConsumerGroupCommand

2016-12-09 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15736740#comment-15736740
 ] 

Jeff Widman commented on KAFKA-3853:


Any idea if this will land in 0.10.2?

> Report offsets for empty groups in ConsumerGroupCommand
> ---
>
> Key: KAFKA-3853
> URL: https://issues.apache.org/jira/browse/KAFKA-3853
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> We ought to be able to display offsets for groups which either have no active 
> members or which are not using group management. The owner column can be left 
> empty or set to "N/A". If a group is active, I'm not sure it would make sense 
> to report all offsets, in particular when partitions are unassigned, but if 
> it seems problematic to do so, we could enable the behavior with a flag (e.g. 
> --include-unassigned).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4519) Delete old unused branches in git repo

2016-12-09 Thread Jeff Widman (JIRA)
Jeff Widman created KAFKA-4519:
--

 Summary: Delete old unused branches in git repo
 Key: KAFKA-4519
 URL: https://issues.apache.org/jira/browse/KAFKA-4519
 Project: Kafka
  Issue Type: Task
Reporter: Jeff Widman
Priority: Trivial


Delete these old git branches, as they're quite outdated and not relevant for 
various version branches:
* consumer_redesign
* transactional_messaging
* 0.8.0-beta1-candidate1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4517) Remove kafka-consumer-offset-checker.sh script since already deprecated in Kafka 9

2016-12-09 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-4517:
---
Summary: Remove kafka-consumer-offset-checker.sh script since already 
deprecated in Kafka 9  (was: Remove kafka-consumer-offset-checker.sh script 
since deprecated in Kafka 9)

> Remove kafka-consumer-offset-checker.sh script since already deprecated in 
> Kafka 9
> --
>
> Key: KAFKA-4517
> URL: https://issues.apache.org/jira/browse/KAFKA-4517
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 0.10.1.0, 0.10.0.0, 0.10.0.1
>Reporter: Jeff Widman
>Priority: Minor
>
> Kafka 9 deprecated kafka-consumer-offset-checker.sh 
> (kafka.tools.ConsumerOffsetChecker) in favor of kafka-consumer-groups.sh 
> (kafka.admin.ConsumerGroupCommand). 
> Since this was deprecated in 9, and the full functionality of the old script 
> appears to be available in the new script, can we remove the old shell script 
> in 10? 
> From an Ops perspective, it's confusing when I'm trying to check consumer 
> offsets that I open the bin directory, and see a script that seems to do 
> exactly what I want, only to later discover that I'm not supposed to use it. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4517) Remove kafka-consumer-offset-checker.sh script since deprecated in Kafka 9

2016-12-09 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-4517:
---
Summary: Remove kafka-consumer-offset-checker.sh script since deprecated in 
Kafka 9  (was: Remove shell scripts deprecated in Kafka 9)

> Remove kafka-consumer-offset-checker.sh script since deprecated in Kafka 9
> --
>
> Key: KAFKA-4517
> URL: https://issues.apache.org/jira/browse/KAFKA-4517
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 0.10.1.0, 0.10.0.0, 0.10.0.1
>Reporter: Jeff Widman
>Priority: Minor
>
> Kafka 9 deprecated kafka-consumer-offset-checker.sh 
> (kafka.tools.ConsumerOffsetChecker) in favor of kafka-consumer-groups.sh 
> (kafka.admin.ConsumerGroupCommand). 
> Since this was deprecated in 9, and the full functionality of the old script 
> appears to be available in the new script, can we remove the old shell script 
> in 10? 
> From an Ops perspective, it's confusing when I'm trying to check consumer 
> offsets that I open the bin directory, and see a script that seems to do 
> exactly what I want, only to later discover that I'm not supposed to use it. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4517) Remove shell scripts deprecated in Kafka 9

2016-12-09 Thread Jeff Widman (JIRA)
Jeff Widman created KAFKA-4517:
--

 Summary: Remove shell scripts deprecated in Kafka 9
 Key: KAFKA-4517
 URL: https://issues.apache.org/jira/browse/KAFKA-4517
 Project: Kafka
  Issue Type: Task
Affects Versions: 0.10.0.1, 0.10.0.0, 0.10.1.0
Reporter: Jeff Widman
Priority: Minor


Kafka 9 deprecated kafka-consumer-offset-checker.sh 
(kafka.tools.ConsumerOffsetChecker) in favor of kafka-consumer-groups.sh 
(kafka.admin.ConsumerGroupCommand). 

Since this was deprecated in 9, and the full functionality of the old script 
appears to be available in the new script, can we remove the old shell script 
in 10? 

>From an Ops perspective, it's confusing when I'm trying to check consumer 
>offsets that I open the bin directory, and see a script that seems to do 
>exactly what I want, only to later discover that I'm not supposed to use it. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1941) Timeout connections in the clients

2016-11-17 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15674679#comment-15674679
 ] 

Jeff Widman commented on KAFKA-1941:


Shouldn't this be closed? I think it was handled in KAFKA-1282, since the docs 
now show producer/consumer configs for idle timeouts

> Timeout connections in the clients
> --
>
> Key: KAFKA-1941
> URL: https://issues.apache.org/jira/browse/KAFKA-1941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>
> Currently the clients never close their connections due to inactivity. It 
> would be nice to have some idle time period after which the client would 
> close a connection.
> This should probably be implemented in Selector.java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15658812#comment-15658812
 ] 

Jeff Widman edited comment on KAFKA-2076 at 11/12/16 2:22 AM:
--

Thanks [~becket_qin], BTW I thought your talk at the Kafka meetup at LinkedIn a 
few weeks ago was very well presented.


was (Author: jeffwidman):
Thanks [~becket_qin], BTW you gave a great presentation at the Kafka meetup at 
LinkedIn a few weeks ago.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15658812#comment-15658812
 ] 

Jeff Widman edited comment on KAFKA-2076 at 11/12/16 2:21 AM:
--

Thanks [~becket_qin], BTW you gave a great presentation at the Kafka meetup at 
LinkedIn a few weeks ago.


was (Author: jeffwidman):
Thanks [~becket_qin], BTW great presentation at the Kafka meetup at LinkedIn a 
few weeks ago.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15658812#comment-15658812
 ] 

Jeff Widman commented on KAFKA-2076:


Thanks [~becket_qin], BTW great presentation at the Kafka meetup at LinkedIn a 
few weeks ago.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15657823#comment-15657823
 ] 

Jeff Widman commented on KAFKA-2076:


It'd be great to see this move forward.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4254) Questionable handling of unknown partitions in KafkaProducer

2016-10-14 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575850#comment-15575850
 ] 

Jeff Widman commented on KAFKA-4254:


Does this affect 10.0 or just 10.1?

> Questionable handling of unknown partitions in KafkaProducer
> 
>
> Key: KAFKA-4254
> URL: https://issues.apache.org/jira/browse/KAFKA-4254
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Konstantine Karantasis
> Fix For: 0.10.1.0
>
>
> Currently the producer will raise an {{IllegalArgumentException}} if the user 
> attempts to write to a partition which has just been created. This is caused 
> by the fact that the producer does not attempt to refetch topic metadata in 
> this case, which means that its check for partition validity is based on 
> stale metadata.
> If the topic for the partition did not already exist, it works fine because 
> the producer will block until it has metadata for the topic, so this case is 
> primarily hit when the number of partitions is dynamically increased. 
> A couple options to fix this that come to mind:
> 1. We could treat unknown partitions just as we do unknown topics. If the 
> partition doesn't exist, we refetch metadata and try again (timing out when 
> max.block.ms is reached).
> 2. We can at least throw a more specific exception so that users can handle 
> the error. Raising {{IllegalArgumentException}} is not helpful in practice 
> because it can also be caused by other error.s
> My inclination is to do the first one since the producer seems incorrect to 
> tell the user that the partition is invalid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)