[jira] [Commented] (KAFKA-7786) Fast update of leader epoch may stall partition fetching due to FENCED_LEADER_EPOCH

2019-01-07 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736415#comment-16736415
 ] 

Jun Rao commented on KAFKA-7786:


Great find, Anna. About the fix. I am not sure that we need the protocol 
change. We know the leader epoch used in OffsetsForLeaderEpoch request. We can 
just pass that along with the OffsetsForLeaderEpoch response to 
maybeTruncate(). If the leaderEpoch in partitionStates has changed, we simply 
ignore the response and retry the OffsetsForLeaderEpoch request with the new 
leader epoch.

 

> Fast update of leader epoch may stall partition fetching due to 
> FENCED_LEADER_EPOCH
> ---
>
> Key: KAFKA-7786
> URL: https://issues.apache.org/jira/browse/KAFKA-7786
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Critical
>
> KIP-320/KAFKA-7395 Added FENCED_LEADER_EPOCH error response to a 
> OffsetsForLeaderEpoch request if the epoch in the request is lower than the 
> broker's leader epoch. ReplicaFetcherThread builds a OffsetsForLeaderEpoch 
> request under _partitionMapLock_, sends the request outside the lock, and 
> then processes the response under _partitionMapLock_. The broker may receive 
> LeaderAndIsr with the same leader but with the next leader epoch, remove and 
> add partition to the fetcher thread (with partition state reflecting the 
> updated leader epoch) – all while the OffsetsForLeaderEpoch request (with the 
> old leader epoch) is still outstanding/ waiting for the lock to process the 
> OffsetsForLeaderEpoch response. As a result, partition gets removed from 
> partitionStates and this broker will not fetch for this partition until the 
> next LeaderAndIsr which may take a while. We will see log message like this:
> [2018-12-23 07:23:04,802] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Partition test_topic-17 has an older epoch (7) than the current 
> leader. Will await the new LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaFetcherThread)
> We saw this happen with 
> kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=True.reassign_from_offset_zero=True.
>  This test does partition re-assignment while bouncing 2 out of 4 total 
> brokers. When the failure happen, each bounced broker was also a controller. 
> Because of re-assignment, the controller updates leader epoch without 
> updating the leader on controller change or on broker startup, so we see 
> several leader epoch changes without the leader change, which increases the 
> likelihood of the race condition described above.
> Here is exact events that happen in this test (around the failure):
> We have 4 brokers Brokers 1, 2, 3, 4. Partition re-assignment is started for 
> test_topic-17 [2, 4, 1]  —> [3, 1, 2]. At time t0, leader of test_topic-17 is 
> broker 2.
>  # clean shutdown of broker 3, which is also a controller
>  # broker 4 becomes controller, continues re-assignment and updates leader 
> epoch for test_topic-17 to 6 (with same leader)
>  # broker 2 (leader of test_topic-17) receives new leader epoch: 
> “test_topic-17 starts at Leader Epoch 6 from offset 1388. Previous Leader 
> Epoch was: 5”
>  # broker 3 is started again after clean shutdown
>  # controller sees broker 3 startup, and sends LeaderAndIsr(leader epoch 6) 
> to broker 3
>  # controller updates leader epoch to 7
>  # broker 2 (leader of test_topic-17) receives LeaderAndIsr for leader epoch 
> 7: “test_topic-17 starts at Leader Epoch 7 from offset 1974. Previous Leader 
> Epoch was: 6”
>  # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 6 from 
> controller: “Added fetcher to broker BrokerEndPoint(id=2) for leader epoch 6” 
> and sends OffsetsForLeaderEpoch request to broker 2
>  # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 7 from 
> controller; removes fetcher thread and adds fetcher thread + executes 
> AbstractFetcherThread.addPartitions() which updates partition state with 
> leader epoch 7
>  # broker 3 receives FENCED_LEADER_EPOCH in response to 
> OffsetsForLeaderEpoch(leader epoch 6), because the leader received 
> LeaderAndIsr for leader epoch 7 before it got OffsetsForLeaderEpoch(leader 
> epoch 6) from broker 3. As a result, it removes partition from 
> partitionStates and it does not fetch until controller updates leader epoch 
> and sends LeaderAndIsr for this partition to broker 3. The test fails, 
> because re-assignment does not finish on time (due to broker 3 not fetching).
>  
> One way to address this is possibly add more state to PartitionFetchState. 
> However, we may introduce other race condition. A cleaner way, I think, is to 
> return 

[jira] [Commented] (KAFKA-7297) Both read/write access to Log.segments should be protected by lock

2018-12-15 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722217#comment-16722217
 ] 

Jun Rao commented on KAFKA-7297:


[~lindong], good points. The current implementation is not causing severe 
issues, but is not ideal. It seems it will be much easier to reason about this 
api if we simply return a non-overlapping, monotonically increasing, and 
unchangeable segment list. If we go with this route, it will be useful to add 
some document so that we are aware of the potential performance impact for 
future usage.

> Both read/write access to Log.segments should be protected by lock
> --
>
> Key: KAFKA-7297
> URL: https://issues.apache.org/jira/browse/KAFKA-7297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> Log.replaceSegments() updates segments in two steps. It first adds new 
> segments and then remove old segments. Though this operation is protected by 
> a lock, other read access such as Log.logSegments does not grab lock and thus 
> these methods may return an inconsistent view of the segments.
> As an example, say Log.replaceSegments() intends to replace segments [0, 
> 100), [100, 200) with a new segment [0, 200). In this case if Log.logSegments 
> is called right after the new segments are added, the method may return 
> segments [0, 200), [100, 200) and messages in the range [100, 200) may be 
> duplicated if caller choose to enumerate all messages in all segments 
> returned by the method.
> The solution is probably to protect read/write access to Log.segments with 
> read/write lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7297) Both read/write access to Log.segments should be protected by lock

2018-12-14 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721960#comment-16721960
 ] 

Jun Rao commented on KAFKA-7297:


[~lindong], Ismael pointed out that ConcurrentSkipListMap supports weakly 
consistent which guarantees the following.

they are guaranteed to traverse elements as they existed upon construction 
exactly once, and may (but are not guaranteed to) reflect any modifications 
subsequent to construction

In the common case where we just add new segments to the end, iterating the 
segments w/o a lock seems ok.

This only case is the segment replacing issue that you mentioned in the 
description. That may not be causing a problem now. For example, readers that 
iterates segments such as flush() and offsetForTimestamp seem to be ok with 
overlapping segments. So for now, maybe we should at least (1) make the 
implementation of both logSegments() consistent, i.e, not taking a lock to get 
the iterator (2) document the behavior of the returned iterator, i.e, the 
underlying elements could change and there could be potential segment overlap.

> Both read/write access to Log.segments should be protected by lock
> --
>
> Key: KAFKA-7297
> URL: https://issues.apache.org/jira/browse/KAFKA-7297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> Log.replaceSegments() updates segments in two steps. It first adds new 
> segments and then remove old segments. Though this operation is protected by 
> a lock, other read access such as Log.logSegments does not grab lock and thus 
> these methods may return an inconsistent view of the segments.
> As an example, say Log.replaceSegments() intends to replace segments [0, 
> 100), [100, 200) with a new segment [0, 200). In this case if Log.logSegments 
> is called right after the new segments are added, the method may return 
> segments [0, 200), [100, 200) and messages in the range [100, 200) may be 
> duplicated if caller choose to enumerate all messages in all segments 
> returned by the method.
> The solution is probably to protect read/write access to Log.segments with 
> read/write lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-14 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721607#comment-16721607
 ] 

Jun Rao commented on KAFKA-7681:


[~mgharat], Meter is calculated by (accumulated values / time of the 
accumulation). So, if you do 1), essentially you get the ratio of the time a 
particular request type spending on the request handler. We just need to name 
the metric accordingly.

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2018-12-13 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720819#comment-16720819
 ] 

Jun Rao commented on KAFKA-2729:


[~murri71], my earlier comment was referring to that we fixed the following 
issue in KAFKA-7165. It's possible that issue may suffice "Cached zkVersion" in 
some scenarios. I am not sure if there are other issues that can still lead to 
"Cached zkVersion". So, my recommendation is to upgrade to 2.2 when it's 
released and file a separate Jira if the issue still exists.

 
[2018-10-05 21:08:28,025] ERROR Error while creating ephemeral at /controller, 
node already exists and owner '3703712903740981258' does not match current 
session '3775770497779040270' (kafka.zk.KafkaZkClient$CheckedEphemeral)
[2018-10-05 21:08:28,025] ERROR Error while creating ephemeral at /controller, 
node already exists and owner '3703712903740981258' does not match current 
session '3775770497779040270' (kafka.zk.KafkaZkClient$CheckedEphemeral)
[2018-10-05 21:08:28,025] INFO Result of znode creation at /controller is: 
NODEEXISTS (kafka.zk.KafkaZkClient)

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Major
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7297) Both read/write access to Log.segments should be protected by lock

2018-12-13 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720710#comment-16720710
 ] 

Jun Rao commented on KAFKA-7297:


To follow up on that, another related issue is that currently, we are just 
returning a view from Log.logSegments. Even if we hold the lock there, when the 
caller iterates it, the iterated view is still undefined when the backing map 
is changing. So, it seems that Log.logSegments needs to hold the lock and 
return a materialized iterator (using iterable.toBuffer). All callers of 
Log.logSegments seem to either be holding lock already or is infrequent. So, 
the added toBuffer overhead is likely ok. 

 

[~lindong]: Do you still plan to work on that?

> Both read/write access to Log.segments should be protected by lock
> --
>
> Key: KAFKA-7297
> URL: https://issues.apache.org/jira/browse/KAFKA-7297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Log.replaceSegments() updates segments in two steps. It first adds new 
> segments and then remove old segments. Though this operation is protected by 
> a lock, other read access such as Log.logSegments does not grab lock and thus 
> these methods may return an inconsistent view of the segments.
> As an example, say Log.replaceSegments() intends to replace segments [0, 
> 100), [100, 200) with a new segment [0, 200). In this case if Log.logSegments 
> is called right after the new segments are added, the method may return 
> segments [0, 200), [100, 200) and messages in the range [100, 200) may be 
> duplicated if caller choose to enumerate all messages in all segments 
> returned by the method.
> The solution is probably to protect read/write access to Log.segments with 
> read/write lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient

2018-12-12 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719228#comment-16719228
 ] 

Jun Rao commented on KAFKA-5692:


[~tombentley], that's good to know. Thanks for your help.

> Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
> -
>
> Key: KAFKA-5692
> URL: https://issues.apache.org/jira/browse/KAFKA-5692
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: kip, patch-available
> Fix For: 2.2.0
>
>
> The PreferredReplicaLeaderElectionCommand currently uses a direct connection 
> to zookeeper. The zookeeper dependency should be deprecated and an 
> AdminClient API created to be used instead. 
> This change will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7713) producer io-wait-ratio > 1

2018-12-11 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718002#comment-16718002
 ] 

Jun Rao commented on KAFKA-7713:


io-wait-ratio is calculated as the ratio of accumulated selector waiting time 
over the length of the metric window. So as the metric window rolls, those 
accumulated waiting time falling into the rolled window will cause the metric 
to go above 1. Not sure what's the best way to address this. One way is to 
treat the Rate/Meter metric special. When we record the value, we split the 
value into the corresponding sample windows and bound the value by the window 
size.

> producer io-wait-ratio > 1
> --
>
> Key: KAFKA-7713
> URL: https://issues.apache.org/jira/browse/KAFKA-7713
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: dan norwood
>Priority: Major
>
> i am running a test on a streams application and gathering jmx measurements 
> to determine what is causing some lag. using `kafka.tools.JmxTool` i was 
> gathering the following attributes `'io-ratio', 'io-wait-ratio', 
> 'select-rate', 'batch-size-avg', 'compression-rate-avg', 'record-size-avg', 
> 'records-per-request-avg'` on my streams instances producers. i noticed that 
> i was getting `io-wait-ratio > 1`, but according to docs it is "The fraction 
> of time the I/O thread spent waiting." 
>  
> some example lines from jmxtool
> |StreamThread-8-producer:batch-size-avg|StreamThread-8-producer:compression-rate-avg|StreamThread-8-producer:io-ratio|*StreamThread-8-producer:io-wait-ratio*|StreamThread-8-producer:record-size-avg|StreamThread-8-producer:records-per-request-avg|StreamThread-8-producer:select-rate|
> |662.2613636|0.3425814926|1.01E-04|*1.178371974*|172.2045455|38.7167|3.855527588|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence

2018-12-10 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716273#comment-16716273
 ] 

Jun Rao commented on KAFKA-7680:


[~NIzhikov], thanks for your interest. Feel free to assign the Jira to yourself.

> fetching a refilled chunk of log can cause log divergence
> -
>
> Key: KAFKA-7680
> URL: https://issues.apache.org/jira/browse/KAFKA-7680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> We use FileRecords.writeTo to send a fetch response for a follower. A log 
> could be truncated and refilled in the middle of the send process (due to 
> leader change). Then it's possible for the follower to append some 
> uncommitted messages followed by committed messages. Those uncommitted 
> messages may never be removed, causing log divergence.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient

2018-12-07 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16713395#comment-16713395
 ] 

Jun Rao commented on KAFKA-5692:


[~tombentley], thanks for contributing the PR. Would be still be interested in 
finishing up the PR? If not, perhaps we can unassign the Jira and see if 
someone else wants to finish this up.

> Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
> -
>
> Key: KAFKA-5692
> URL: https://issues.apache.org/jira/browse/KAFKA-5692
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: kip, patch-available
> Fix For: 2.2.0
>
>
> The PreferredReplicaLeaderElectionCommand currently uses a direct connection 
> to zookeeper. The zookeeper dependency should be deprecated and an 
> AdminClient API created to be used instead. 
> This change will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-06 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7704.

   Resolution: Fixed
Fix Version/s: 2.1.1
   2.2.0

Merged to trunk and 2.1.

> kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported 
> incorrectly
> ---
>
> Key: KAFKA-7704
> URL: https://issues.apache.org/jira/browse/KAFKA-7704
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.1.0
>Reporter: Yu Yang
>Assignee: huxihx
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
> Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png, Screen Shot 
> 2018-12-05 at 10.13.09 PM.png
>
>
> We recently deployed kafka 2.1, and noticed a jump in 
> kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
> there is no under-replicated partitions for the cluster. 
> The initial analysis shows that kafka 2.1.0 does not report metric correctly 
> for topics that have no incoming traffic right now, but had traffic earlier. 
> For those topics, ReplicaFetcherManager will consider the maxLag be the 
> latest offset. 
> For instance, we have a topic named `test_topic`: 
> {code}
> [root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
> total 8
> -rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
> -rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
> -rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
> -rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
> -rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
> {code}
> kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579
>  !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-04 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709589#comment-16709589
 ] 

Jun Rao commented on KAFKA-7704:


[~yuyang08], could you try the PR and see if it fixes the issue? Thanks,

> kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported 
> incorrectly
> ---
>
> Key: KAFKA-7704
> URL: https://issues.apache.org/jira/browse/KAFKA-7704
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.1.0
>Reporter: Yu Yang
>Assignee: huxihx
>Priority: Major
> Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png
>
>
> We recently deployed kafka 2.1, and noticed a jump in 
> kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
> there is no under-replicated partitions for the cluster. 
> The initial analysis shows that kafka 2.1.0 does not report metric correctly 
> for topics that have no incoming traffic right now, but had traffic earlier. 
> For those topics, ReplicaFetcherManager will consider the maxLag be the 
> latest offset. 
> For instance, we have a topic named `test_topic`: 
> {code}
> [root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
> total 8
> -rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
> -rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
> -rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
> -rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
> -rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
> {code}
> kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579
>  !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2018-12-04 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709539#comment-16709539
 ] 

Jun Rao commented on KAFKA-2729:


We fixed another issue that can fail the re-creation of the broker registration 
in ZK in KAFKA-7165 in 2.2.0.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Major
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-1120) Controller could miss a broker state change

2018-12-02 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1120.

Resolution: Fixed
  Assignee: Zhanxiang (Patrick) Huang  (was: Mickael Maison)

This is fixed by KAFKA-7235.

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>  Labels: reliability
> Fix For: 2.2.0
>
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request

2018-12-02 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7235.

   Resolution: Fixed
Fix Version/s: 2.2.0

merged to trunk.

> Use brokerZkNodeVersion to prevent broker from processing outdated controller 
> request
> -
>
> Key: KAFKA-7235
> URL: https://issues.apache.org/jira/browse/KAFKA-7235
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.2.0
>
>
> Currently a broker can process controller requests that are sent before the 
> broker is restarted. This could cause a few problems. Here is one example:
> Let's assume partitions p1 and p2 exists on broker1.
> 1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.
> 2) Before controller sends the request, broker1 is quickly restarted.
> 3) The LeaderAndIsrRequest with p1 is delivered to broker1.
> 4) After processing the first LeaderAndIsrRequest, broker1 starts to 
> checkpoint high watermark for all partitions that it owns. Thus it may 
> overwrite high watermark checkpoint file with only the hw for partition p1. 
> The hw for partition p2 is now lost, which could be a problem.
> In general, the correctness of broker logic currently relies on a few 
> assumption, e.g. the first LeaderAndIsrRequest received by broker should 
> contain all partitions hosted by the broker, which could break if broker can 
> receive controller requests that were generated before it restarts. 
> One reasonable solution to the problem is to include the 
> expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
> the broker znode zkVersion after it registers itself in the zookeeper. Then 
> broker can reject those controller requests whose expectedBrokeNodeZkVersion 
> is different from its broker znode zkVersion.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7449) Kafka console consumer is not sending topic to deserializer

2018-11-28 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7449.

   Resolution: Fixed
 Assignee: Mathieu Chataigner
Fix Version/s: 2.2.0

Merged to trunk.

> Kafka console consumer is not sending topic to deserializer
> ---
>
> Key: KAFKA-7449
> URL: https://issues.apache.org/jira/browse/KAFKA-7449
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mathieu Chataigner
>Assignee: Mathieu Chataigner
>Priority: Major
>  Labels: easyfix, pull-request-available
> Fix For: 2.2.0
>
>
> We tried to create a custom Deserializer to consume some protobuf topics.
> We have a mechanism for getting the protobuf class from topic name however 
> the console consumer is not forwarding the topic of the console consumer 
> record down to the deserializer.
> Topic information is available in the ConsumerRecord.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-11-28 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702176#comment-16702176
 ] 

Jun Rao commented on KAFKA-7681:


Hi, [~mgharat],

The Broker Topic metrics only collect the byte read/write rate. This is a bit 
limiting since it only covers produce/fetch requests. Sometimes, other types of 
requests (e.g. metadata, joinGroup) could be hogging the request handling 
threads. Also, the byte rate doesn't tell us how much the request handler 
thread is being used. For example, the serving of a fetch request is mostly 
done in the network thread, instead of the request handling thread.

What I am thinking is the following. Each request handler thread is being used 
from the time that it takes a request from the request queue, until the local 
processing of the request is done (KafkaApis.handle() returns), which is the 
request localTime. If we aggregate the localTime per request type (e.g. 
producer/ fetch/metadata/joinGroup, etc) and calculate a rate of that value, it 
gives us the fraction of request handler usage by request type. This will tell 
us which type of request is using the requests handler threads the most and can 
be useful for debugging.

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7682) turning on request logging for a subset of request types

2018-11-27 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7682:
--

 Summary: turning on request logging for a subset of request types
 Key: KAFKA-7682
 URL: https://issues.apache.org/jira/browse/KAFKA-7682
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


Turning on request level logging can be useful for debugging. However, the 
request logging can be quite verbose. It would be useful to turn if on for a 
subset of the request types. We already have a jmx bean to turn on/off the 
request logging dynamically. We could add a new jmx bean to control the request 
types to be logged. This requires a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7681) new metric for request thread utilization by request type

2018-11-27 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7681:
--

 Summary: new metric for request thread utilization by request type
 Key: KAFKA-7681
 URL: https://issues.apache.org/jira/browse/KAFKA-7681
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


When the request thread pool is saturated, it's often useful to know which type 
request is using the thread pool the most. It would be useful to add a metric 
that tracks the fraction of request thread pool usage by request type. This 
would be equivalent to (request rate) * (request local time ms) / 1000, but 
will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence

2018-11-27 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701084#comment-16701084
 ] 

Jun Rao commented on KAFKA-7680:


One way to fix this is the following. We already include the expected 
leaderEpoch for each partition in the fetch response. So, the follower 
shouldn't see message sets with leaderEpoch higher than the expected one. If 
the follower does see such message sets, it's the result that the log is 
refilled in the middle of a transfer and the follower should just ignore this 
response and retry.

cc [~hachikuji]

> fetching a refilled chunk of log can cause log divergence
> -
>
> Key: KAFKA-7680
> URL: https://issues.apache.org/jira/browse/KAFKA-7680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> We use FileRecords.writeTo to send a fetch response for a follower. A log 
> could be truncated and refilled in the middle of the send process (due to 
> leader change). Then it's possible for the follower to append some 
> uncommitted messages followed by committed messages. Those uncommitted 
> messages may never be removed, causing log divergence.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence

2018-11-27 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7680:
--

 Summary: fetching a refilled chunk of log can cause log divergence
 Key: KAFKA-7680
 URL: https://issues.apache.org/jira/browse/KAFKA-7680
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao


We use FileRecords.writeTo to send a fetch response for a follower. A log could 
be truncated and refilled in the middle of the send process (due to leader 
change). Then it's possible for the follower to append some uncommitted 
messages followed by committed messages. Those uncommitted messages may never 
be removed, causing log divergence.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2018-11-19 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692047#comment-16692047
 ] 

Jun Rao commented on KAFKA-4277:


[~birger] : Hmm, according to 
[http://mail-archives.apache.org/mod_mbox/zookeeper-user/201701.mbox/%3CB512F6DE-C0BF-45CE-8102-6F242988268E%40apache.org%3E,]
 what you described shouldn't have happened in ZK. Was the restart after a hard 
broker failure? If so, did you wait longer than zookeeper.session.timeout.ms 
before restart the failed broker?

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>Priority: Major
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running

2018-11-08 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7412.

   Resolution: Fixed
Fix Version/s: 2.2.0

Clarified the javadoc in producer callback. Metadata is not null with non-null 
exception.

Merged the PR to trunk.

> Bug prone response from producer.send(ProducerRecord, Callback) if Kafka 
> broker is not running
> --
>
> Key: KAFKA-7412
> URL: https://issues.apache.org/jira/browse/KAFKA-7412
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.0.0
>Reporter: Michal Turek
>Assignee: huxihx
>Priority: Major
> Fix For: 2.2.0
>
> Attachments: both_metadata_and_exception.png, 
> metadata_when_kafka_is_stopped.png
>
>
> Hi there, I have probably found a bug in Java Kafka producer client.
> Scenario & current behavior:
> - Start Kafka broker, single instance.
> - Start application that produces messages to Kafka.
> - Let the application to load partitions for a topic to warm up the producer, 
> e.g. send a message to Kafka. I'm not sure if this is necessary step, but our 
> code does it.
> - Gracefully stop the Kafka broker.
> - Application logs now contains "org.apache.kafka.clients.NetworkClient: 
> [Producer clientId=...] Connection to node 0 could not be established. Broker 
> may not be available." so the client is aware about the Kafka unavailability.
> - Trigger the producer to send a message using 
> KafkaProducer.send(ProducerRecord, Callback) method.
> - The callback that notifies business code receives non-null RecordMetadata 
> and null Exception after request.timeout.ms. The metadata contains offset -1 
> which is value of ProduceResponse.INVALID_OFFSET.
> Expected behavior:
> - If the Kafka is not running and the message is not appended to the log, the 
> callback should contain null RecordMetadata and non-null Exception. At least 
> I subjectively understand the Javadoc this way, "exception on production 
> error" in simple words.
> - Developer that is not aware of this behavior and that doesn't test for 
> offset -1, may consider the message as successfully send and properly acked 
> by the broker.
> Known workaround
> - Together with checking for non-null exception in the callback, add another 
> condition for ProduceResponse.INVALID_OFFSET.
> {noformat}
> try {
> producer.send(record, (metadata, exception) -> {
> if (metadata != null) {
> if (metadata.offset() != 
> ProduceResponse.INVALID_OFFSET) {
> // Success
> } else {
> // Failure
> }
> } else {
> // Failure
> }
> });
> } catch (Exception e) {
> // Failure
> }
> {noformat}
> Used setup
> - Latest Kafka 2.0.0 for both broker and Java client.
> - Originally found with broker 0.11.0.1 and client 2.0.0.
> - Code is analogy of the one in Javadoc of KafkaProducer.send().
> - Used producer configuration (others use defaults).
> {noformat}
> bootstrap.servers = "localhost:9092"
> client.id = "..."
> acks = "all"
> retries = 1
> linger.ms = "20"
> compression.type = "lz4"
> request.timeout.ms = 5000 # The same behavior is with default, this is to 
> speed up the tests
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7537) Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states

2018-11-06 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7537.

   Resolution: Fixed
Fix Version/s: 2.2.0

Merged the PR to trunk.

> Only include live brokers in the UpdateMetadataRequest sent to existing 
> brokers if there is no change in the partition states
> -
>
> Key: KAFKA-7537
> URL: https://issues.apache.org/jira/browse/KAFKA-7537
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.2.0
>
>
> Currently if when brokers join/leave the cluster without any partition states 
> changes, controller will send out UpdateMetadataRequests containing the 
> states of all partitions to all brokers. But for existing brokers in the 
> cluster, the metadata diff between controller and the broker should only be 
> the "live_brokers" info. Only the brokers with empty metadata cache need the 
> full UpdateMetadataRequest. Sending the full UpdateMetadataRequest to all 
> brokers can place nonnegligible memory pressure on the controller side.
> Let's say in total we have N brokers, M partitions in the cluster and we want 
> to add 1 brand new broker in the cluster. With RF=2, the memory footprint per 
> partition in the UpdateMetadataRequest is ~200 Bytes. In the current 
> controller implementation, if each of the N RequestSendThreads serializes and 
> sends out the UpdateMetadataRequest at roughly the same time (which is very 
> likely the case), we will end up using *(N+1)*M*200B*. In a large kafka 
> cluster, we can have:
> {noformat}
> N=99
> M=100k
> Memory usage to send out UpdateMetadataRequest to all brokers:
> 100 * 100K * 200B = 2G
> However, we only need to send out full UpdateMetadataRequest to the newly 
> added broker. We only need to include live broker ids (4B * 100 brokers) in 
> the UpdateMetadataRequest sent to the existing 99 brokers. So the amount of 
> data that is actully needed will be:
> 1 * 100K * 200B + 99 * (100 * 4B) = ~21M
> We will can potentially reduce 2G / 21M = ~95x memory footprint as well as 
> the data tranferred in the network.{noformat}
>  
> This issue kind of hurts the scalability of a kafka cluster. KIP-380 and 
> KAFKA-7186 also help to further reduce the controller memory footprint.
>  
> In terms of implementation, we can keep some in-memory state in the 
> controller side to differentiate existing brokers and uninitialized brokers 
> (e.g. brand new brokers) so that if there is no change in partition states, 
> we only send out live brokers info to existing brokers.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-29 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667977#comment-16667977
 ] 

Jun Rao commented on KAFKA-7481:


I agree that introducing a new config probably needs more thought. Between 
option 1 and 2, option 2 doesn't seem to provide much more benefits than option 
1 and option 1 is simpler. So, I am in favor of option 1 too.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-10-29 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667914#comment-16667914
 ] 

Jun Rao commented on KAFKA-7165:


[~pachilo], your PR actually looks reasonable for addressing the issue of 
creating the ephemeral node. I am not sure if it addresses the 
org.apache.zookeeper.KeeperException$SessionExpiredException in the isr 
expiration thread. The failure of the creation of the ephemeral node shouldn't 
prevent the new ZK session being created. So, I am wondering why the 
SessionExpiredException continued for 6 hours. Were there extended network 
issue with the ZK cluster?

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' 

[jira] [Commented] (KAFKA-7557) optimize LogManager.truncateFullyAndStartAt()

2018-10-26 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665737#comment-16665737
 ] 

Jun Rao commented on KAFKA-7557:


To improve this, instead of calling 
Log.deleteSnapshotsAfterRecoveryPointCheckpoint() on all logs, we could 
probably call only the one that's being truncated.

> optimize LogManager.truncateFullyAndStartAt()
> -
>
> Key: KAFKA-7557
> URL: https://issues.apache.org/jira/browse/KAFKA-7557
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Jun Rao
>Priority: Major
>
> When a ReplicaFetcherThread calls LogManager.truncateFullyAndStartAt() for a 
> partition, we call LogManager.checkpointLogRecoveryOffsetsInDir() and then 
> Log.deleteSnapshotsAfterRecoveryPointCheckpoint() on all the logs in that 
> directory. This requires listing all the files in each log dir to figure out 
> the snapshot files. If some logs have many log segment files. This could take 
> some time. The can potentially block a replica fetcher thread, which 
> indirectly causes the request handler threads to be blocked.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7557) optimize LogManager.truncateFullyAndStartAt()

2018-10-26 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7557:
--

 Summary: optimize LogManager.truncateFullyAndStartAt()
 Key: KAFKA-7557
 URL: https://issues.apache.org/jira/browse/KAFKA-7557
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0, 2.1.0
Reporter: Jun Rao


When a ReplicaFetcherThread calls LogManager.truncateFullyAndStartAt() for a 
partition, we call LogManager.checkpointLogRecoveryOffsetsInDir() and then 
Log.deleteSnapshotsAfterRecoveryPointCheckpoint() on all the logs in that 
directory. This requires listing all the files in each log dir to figure out 
the snapshot files. If some logs have many log segment files. This could take 
some time. The can potentially block a replica fetcher thread, which indirectly 
causes the request handler threads to be blocked.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7538) Improve locking model used to update ISRs and HW

2018-10-24 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662538#comment-16662538
 ] 

Jun Rao commented on KAFKA-7538:


[~rsivaram], thanks for the analysis. I agree that approach 2) is probably the 
easiest fix at this moment. I am wondering how much this will help though. With 
the fix, other producers will be able to proceed to append to the log. If the 
log append is slow, then all those produce requests will be blocked too, tying 
up all the request handlers. So, it seems that we will still need to fix the 
root cause of the problem, which is the slow log append.

> Improve locking model used to update ISRs and HW
> 
>
> Key: KAFKA-7538
> URL: https://issues.apache.org/jira/browse/KAFKA-7538
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.2.0
>
>
> We currently use a ReadWriteLock in Partition to update ISRs and high water 
> mark for the partition. This can result in severe lock contention if there 
> are multiple producers writing a large amount of data into a single partition.
> The current locking model is:
>  # read lock while appending to log on every Produce request on the request 
> handler thread
>  # write lock on leader change, updating ISRs etc. on request handler or 
> scheduler thread
>  # write lock on every replica fetch request to check if ISRs need to be 
> updated and to update HW and ISR on the request handler thread
> 2) is infrequent, but 1) and 3) may be frequent and can result in lock 
> contention. If there are lots of produce requests to a partition from 
> multiple processes, on the leader broker we may see:
>  # one slow log append locks up one request thread for that produce while 
> holding onto the read lock
>  # (replicationFactor-1) request threads can be blocked waiting for write 
> lock to process replica fetch request
>  # potentially several other request threads processing Produce may be queued 
> up to acquire read lock because of the waiting writers.
> In a thread dump with this issue, we noticed several request threads blocked 
> waiting for write, possibly to due to replication fetch retries.
>  
> Possible fixes:
>  # Process `Partition#maybeExpandIsr` on a single scheduler thread similar to 
> `Partition#maybeShrinkIsr` so that only a single thread is blocked on the 
> write lock. But this will delay updating ISRs and HW.
>  # Change locking in `Partition#maybeExpandIsr` so that only read lock is 
> acquired to check if ISR needs updating and write lock is acquired only to 
> update ISRs. Also use a different lock for updating HW (perhaps just the 
> Partition object lock) so that typical replica fetch requests complete 
> without acquiring Partition write lock on the request handler thread.
> I will submit a PR for 2) , but other suggestions to fix this are welcome.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7299) batch LeaderAndIsr requests during auto preferred leader election

2018-10-22 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-7299:
---
Fix Version/s: 2.0.1
   1.1.2

Kevin, since this is a small patch, I merged it to both 1.1 and 2.0 branch.

> batch LeaderAndIsr requests during auto preferred leader election
> -
>
> Key: KAFKA-7299
> URL: https://issues.apache.org/jira/browse/KAFKA-7299
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently, in KafkaController.checkAndTriggerAutoLeaderRebalance(), we call 
> onPreferredReplicaElection() one partition at a time. This means that the 
> controller will be sending LeaderAndIsrRequest one partition at a time. It 
> would be more efficient to call onPreferredReplicaElection() for a batch of 
> partitions to reduce the number of LeaderAndIsrRequests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-10-15 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650798#comment-16650798
 ] 

Jun Rao commented on KAFKA-7504:


The case that Allen described is when a follower is fetching both in-sync and 
out-of-sync partitions. If the fetching of the out-of-sync partition's data is 
taking long, it will delay the propagation of the in-sync partition's data, 
which can increase the producer latency. This can happen even when the 
replication quota is enabled. To improve this case, we can potentially extend 
the idea in Yuto's patch for handling replication fetch request. Basically, we 
initiate the prefetching of a partition in a background thread. The fetch 
response can be sent when the prefetching of all partitions' data are completed 
or the timeout is reached. In the case of timeout, we only include those 
partitions whose prefetching has completed.

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer 

[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-10-14 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649546#comment-16649546
 ] 

Jun Rao commented on KAFKA-7504:


[~kawamuray], thanks for the jira. Great find. Simple, but yet effective 
solution. Very useful.

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not.
>  If the target data isn't in page cache, kernel first needs to load the 
> target page into cache. This takes more than few milliseconds to tens of 
> milliseconds depending on disk hardware and current load being applied to it.
>  Linux kernel doesn't considers the moment loading data from disk into page 
> cache as "blocked", hence it awaits completion of target data loading rather 
> than 

[jira] [Resolved] (KAFKA-7482) LeaderAndIsrRequest should be sent to the shutting down broker

2018-10-12 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7482.

   Resolution: Fixed
Fix Version/s: 2.1.0

merged to 2.1 and trunk

> LeaderAndIsrRequest should be sent to the shutting down broker
> --
>
> Key: KAFKA-7482
> URL: https://issues.apache.org/jira/browse/KAFKA-7482
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 2.0.0
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Major
> Fix For: 2.1.0
>
>
> We introduced a regression in KAFKA-5642 in 1.1. Before 1.1, during a 
> controlled shutdown, the LeaderAndIsrRequest is sent to the shutting down 
> broker to inform it that it's no longer the leader for partitions whose 
> leader have been moved. After 1.1, such LeaderAndIsrRequest is no longer sent 
> to the shutting down broker. This can delay the time for the client to find 
> out the new leader.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3097) Acls for PrincipalType User are case sensitive

2018-10-09 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-3097.

   Resolution: Fixed
 Assignee: Manikumar  (was: Ashish Singh)
Fix Version/s: 2.1.0

Merged to 2.1 and trunk.

> Acls for PrincipalType User are case sensitive
> --
>
> Key: KAFKA-3097
> URL: https://issues.apache.org/jira/browse/KAFKA-3097
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Thomas Graves
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
>
> I gave  a user acls for READ/WRITE but when I went to actually write to the 
> topic failed with auth exception. I figured out it was due to me specifying 
> the user as:  user:tgraves rather then User:tgraves.
> Seems like It should either fail on assign or be case insensitive.
> The principal type of User should also probably be documented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately

2018-10-09 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7366.

   Resolution: Fixed
Fix Version/s: 2.1.0

Merged to 2.1 and trunk.

> topic level segment.bytes and segment.ms not taking effect immediately
> --
>
> Key: KAFKA-7366
> URL: https://issues.apache.org/jira/browse/KAFKA-7366
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 2.0.0
>Reporter: Jun Rao
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
>
> It used to be that topic level configs such as segment.bytes takes effect 
> immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect 
> after the active segment has rolled. The relevant part of KAFKA-6324 is that 
> in Log.maybeRoll, the checking of the segment rolling is moved to 
> LogSegment.shouldRoll().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7215) Improve LogCleaner behavior on error

2018-10-08 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7215.

   Resolution: Fixed
Fix Version/s: 2.1.0

Merged to 2.1 and trunk.

> Improve LogCleaner behavior on error
> 
>
> Key: KAFKA-7215
> URL: https://issues.apache.org/jira/browse/KAFKA-7215
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.1.0
>
>
> For more detailed information see 
> [KIP-346|https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7482) LeaderAndIsrRequest should be sent to the shutting down broker

2018-10-04 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7482:
--

 Summary: LeaderAndIsrRequest should be sent to the shutting down 
broker
 Key: KAFKA-7482
 URL: https://issues.apache.org/jira/browse/KAFKA-7482
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0, 1.1.0
Reporter: Jun Rao
Assignee: Jun Rao


We introduced a regression in KAFKA-5642 in 1.1. Before 1.1, during a 
controlled shutdown, the LeaderAndIsrRequest is sent to the shutting down 
broker to inform it that it's no longer the leader for partitions whose leader 
have been moved. After 1.1, such LeaderAndIsrRequest is no longer sent to the 
shutting down broker. This can delay the time for the client to find out the 
new leader.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7216) Exception while running kafka-acls.sh from 1.0 env on target Kafka env with 1.1.1

2018-09-25 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7216.

   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.1
   1.1.2
   1.0.3

Merged to trunk, 2.1, 1.1 and 1.0.

> Exception while running kafka-acls.sh from 1.0 env on target Kafka env with 
> 1.1.1
> -
>
> Key: KAFKA-7216
> URL: https://issues.apache.org/jira/browse/KAFKA-7216
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: Satish Duggana
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> When `kafka-acls.sh` with SimpleAclAuthorizer on target Kafka cluster with 
> 1.1.1 version, it throws the below error.
> {code:java}
> kafka.common.KafkaException: DelegationToken not a valid resourceType name. 
> The valid names are Topic,Group,Cluster,TransactionalId
>   at 
> kafka.security.auth.ResourceType$$anonfun$fromString$1.apply(ResourceType.scala:56)
>   at 
> kafka.security.auth.ResourceType$$anonfun$fromString$1.apply(ResourceType.scala:56)
>   at scala.Option.getOrElse(Option.scala:121)
>   at kafka.security.auth.ResourceType$.fromString(ResourceType.scala:56)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1$$anonfun$apply$mcV$sp$1.apply(SimpleAclAuthorizer.scala:233)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1$$anonfun$apply$mcV$sp$1.apply(SimpleAclAuthorizer.scala:232)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply$mcV$sp(SimpleAclAuthorizer.scala:232)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply(SimpleAclAuthorizer.scala:230)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$loadCache$1.apply(SimpleAclAuthorizer.scala:230)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:216)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:224)
>   at 
> kafka.security.auth.SimpleAclAuthorizer.loadCache(SimpleAclAuthorizer.scala:230)
>   at 
> kafka.security.auth.SimpleAclAuthorizer.configure(SimpleAclAuthorizer.scala:114)
>   at kafka.admin.AclCommand$.withAuthorizer(AclCommand.scala:83)
>   at kafka.admin.AclCommand$.addAcl(AclCommand.scala:93)
>   at kafka.admin.AclCommand$.main(AclCommand.scala:53)
>   at kafka.admin.AclCommand.main(AclCommand.scala)
> {code}
>  
>  This is because it tries to get all the resource types registered from ZK 
> path and it throws error when `DelegationToken` resource is not defined in 
> `ResourceType` of client's Kafka version(which is earlier than 1.1.x)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7400) Compacted topic segments that precede the log start offset are not cleaned up

2018-09-21 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7400.

   Resolution: Fixed
Fix Version/s: 2.1.0

Merged the PR to trunk.

> Compacted topic segments that precede the log start offset are not cleaned up
> -
>
> Key: KAFKA-7400
> URL: https://issues.apache.org/jira/browse/KAFKA-7400
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: Bob Barrett
>Assignee: Bob Barrett
>Priority: Minor
> Fix For: 2.1.0
>
>
> LogManager.cleanupLogs currently checks if a topic is compacted, and skips 
> any deletion if it is. This means that if the log start offset increases, log 
> segments that precede the start offset will never be deleted. The log cleanup 
> logic should be improved to delete these segments even for compacted topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election

2018-09-14 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615492#comment-16615492
 ] 

Jun Rao commented on KAFKA-7408:


It does seem that truncating to LSO handles unclean leader election better. 
About #1 above, it seems that there is no strong ordering between the leader 
and the transaction coordinator when writing the transaction marker. I am 
wondering if the following can happen. (1) Leader truncated to LSO and is about 
to write a commit marker for a producer id to the log. (2) The coordinator 
writes an abort marker to the log based on an ongoing transaction. (3) Leader 
writes the commit marker to the log. At this point, the commit marker has no 
effect because the abort marker was written ahead of it.

> Truncate to LSO on unclean leader election
> --
>
> Key: KAFKA-7408
> URL: https://issues.apache.org/jira/browse/KAFKA-7408
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> If an unclean leader is elected, we may lose committed transaction data. That 
> alone is expected, but what is worse is that a transaction which was 
> previously completed (either committed or aborted) may lose its marker and 
> become dangling. The transaction coordinator will not know about the unclean 
> leader election, so will not know to resend the transaction markers. 
> Consumers with read_committed isolation will be stuck because the LSO cannot 
> advance.
> To keep this scenario from occurring, it would be better to have the unclean 
> leader truncate to the LSO so that there are no dangling transactions. 
> Truncating to the LSO is not alone sufficient because the markers which 
> allowed the LSO advancement may be at higher offsets. What we can do is let 
> the newly elected leader truncate to the LSO and then rewrite all the markers 
> that followed it using its own leader epoch (to avoid divergence from 
> followers).
> The interesting cases when an unclean leader election occurs are are when a 
> transaction is ongoing. 
> 1. If a producer is in the middle of a transaction commit, then the 
> coordinator may still attempt to write transaction markers. This will either 
> succeed or fail depending on the producer epoch in the unclean leader. If the 
> epoch matches, then the WriteTxnMarker call will succeed, which will simply 
> be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker 
> call will fail and the transaction coordinator can potentially remove the 
> partition from the transaction.
> 2. If a producer is still writing the transaction, then what happens depends 
> on the producer state in the unclean leader. If no producer state has been 
> lost, then the transaction can continue without impact. Otherwise, the 
> producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will 
> cause the transaction to be aborted by the coordinator. That takes us back to 
> the first case.
> By truncating the LSO, we ensure that transactions are either preserved in 
> whole or they are removed from the log in whole. For an unclean leader 
> election, that's probably as good as we can do. But we are ensured that 
> consumers will not be blocked by dangling transactions. The only remaining 
> situation where a dangling transaction might be left is if one of the 
> transaction state partitions has an unclean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7117) Allow AclCommand to use AdminClient API

2018-09-07 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7117.

   Resolution: Fixed
Fix Version/s: 2.1.0

Merged the PR to trunk.

> Allow AclCommand to use AdminClient API
> ---
>
> Key: KAFKA-7117
> URL: https://issues.apache.org/jira/browse/KAFKA-7117
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> Currently AclCommand (kafka-acls.sh) uses authorizer class (default 
> SimpleAclAuthorizer) to manage acls.
> We should also allow AclCommand to support AdminClient API based acl 
> management. This will allow kafka-acls.sh script users to manage acls without 
> interacting zookeeper/authorizer directly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7287) Set open ACL permissions for old consumer znode path

2018-08-31 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7287.

   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.1
   1.1.2

Also merged the PR to 1.1 branch.

> Set open ACL permissions for old consumer znode path
> 
>
> Key: KAFKA-7287
> URL: https://issues.apache.org/jira/browse/KAFKA-7287
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Old consumer znode path should have open ACL permissions in kerberized 
> environment. This got missed in kafkaZkClient changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately

2018-08-31 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599003#comment-16599003
 ] 

Jun Rao commented on KAFKA-7366:


[~ijuma], it might be debatable. However, if all other topic level configs take 
effect immediately, it seems that we should make that consistent for all 
configs.

> topic level segment.bytes and segment.ms not taking effect immediately
> --
>
> Key: KAFKA-7366
> URL: https://issues.apache.org/jira/browse/KAFKA-7366
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 2.0.0
>Reporter: Jun Rao
>Priority: Major
>
> It used to be that topic level configs such as segment.bytes takes effect 
> immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect 
> after the active segment has rolled. The relevant part of KAFKA-6324 is that 
> in Log.maybeRoll, the checking of the segment rolling is moved to 
> LogSegment.shouldRoll().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7366) topic level segment.bytes and segment.ms not taking effect immediately

2018-08-31 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7366:
--

 Summary: topic level segment.bytes and segment.ms not taking 
effect immediately
 Key: KAFKA-7366
 URL: https://issues.apache.org/jira/browse/KAFKA-7366
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Jun Rao


It used to be that topic level configs such as segment.bytes takes effect 
immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect 
after the active segment has rolled. The relevant part of KAFKA-6324 is that in 
Log.maybeRoll, the checking of the segment rolling is moved to 
LogSegment.shouldRoll().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7287) Set open ACL permissions for old consumer znode path

2018-08-28 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595702#comment-16595702
 ] 

Jun Rao commented on KAFKA-7287:


[~omkreddy], thanks for the patch. I merged the PR to trunk and 2.0. It doesn't 
apply cleanly to 1.1. Do you think you could submit a separate PR for 1.1? 
Thanks.

> Set open ACL permissions for old consumer znode path
> 
>
> Key: KAFKA-7287
> URL: https://issues.apache.org/jira/browse/KAFKA-7287
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
>
> Old consumer znode path should have open ACL permissions in kerberized 
> environment. This got missed in kafkaZkClient changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6343) OOM as the result of creation of 5k topics

2018-08-22 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6343.

   Resolution: Fixed
 Assignee: Alex Dunayevsky
Fix Version/s: 2.1.0

Merged the PR to trunk.

> OOM as the result of creation of 5k topics
> --
>
> Key: KAFKA-6343
> URL: https://issues.apache.org/jira/browse/KAFKA-6343
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 0.11.0.2, 1.0.0
> Environment: RHEL 7, RAM 755GB per host
>Reporter: Alex Dunayevsky
>Assignee: Alex Dunayevsky
>Priority: Major
> Fix For: 2.1.0
>
>
> *Reproducing*: Create 5k topics *from the code* quickly, without any delays. 
> Wait until brokers will finish loading them. This will actually never happen, 
> since all brokers will go down one by one after approx 10-15 minutes or more, 
> depending on the hardware.
> *Heap*: -Xmx/Xms: 5G, 10G, 50G, 256G, 512G
>  
> *Topology*: 3 brokers, 3 zk.
> *Code for 5k topic creation:*
> {code:java}
> package kafka
> import kafka.admin.AdminUtils
> import kafka.utils.{Logging, ZkUtils}
> object TestCreateTopics extends App with Logging {
>   val zkConnect = "grid978:2185"
>   var zkUtils = ZkUtils(zkConnect, 6000, 6000, isZkSecurityEnabled = false)
>   for (topic <- 1 to 5000) {
> AdminUtils.createTopic(
>   topic = s"${topic.toString}",
>   partitions= 10,
>   replicationFactor = 2,
>   zkUtils   = zkUtils
> )
> logger.info(s"Created topic ${topic.toString}")
>   }
> }
> {code}
> *Cause of death:*
> {code:java}
> java.io.IOException: Map failed
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:920)
> at kafka.log.AbstractIndex.(AbstractIndex.scala:61)
> at kafka.log.OffsetIndex.(OffsetIndex.scala:52)
> at kafka.log.LogSegment.(LogSegment.scala:67)
> at kafka.log.Log.loadSegments(Log.scala:255)
> at kafka.log.Log.(Log.scala:108)
> at kafka.log.LogManager.createLog(LogManager.scala:362)
> at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
> at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
> at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
> at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
> at kafka.cluster.Partition.makeLeader(Partition.scala:168)
> at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
> at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
> at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
> at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: Map failed
> at sun.nio.ch.FileChannelImpl.map0(Native Method)
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:917)
> ... 28 more
> {code}
> Broker restart results the same OOM issues. All brokers will not be able to 
> start again. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588220#comment-16588220
 ] 

Jun Rao commented on KAFKA-6753:


[~luwang], that sounds good too. To make tracking easier, perhaps it's better 
to close this jira with an updated description and create a separate (umbrella) 
jira to track the remaining work.

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588026#comment-16588026
 ] 

Jun Rao commented on KAFKA-6753:


[~luwang], we could consider removing this metric, but it probably requires a 
KIP. We could also potentially just piggyback the update of the metric every 
time the auto leader balancer runs. What do you think?

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6753.

   Resolution: Fixed
Fix Version/s: 2.1.0

Merged the PR to trunk.

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.
> The patch that will be submitted tries to fix bullet 2 above, i.e. reducing 
> the number of invocations to update metrics. Instead of updating the metrics 
> after processing any event, we only periodically check if the metrics needs 
> to be updated, i.e. once every second. 
> * If after the previous invocation to update metrics, there are other types 
> of events that changed the controller’s state, then one second later the 
> metrics will be updated. 
> * If after the previous invocation, there has been no other types of events, 
> then the call to update metrics can be bypassed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6835) Enable topic unclean leader election to be enabled without controller change

2018-08-20 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6835.

Resolution: Fixed

Merged the PR to trunk.

> Enable topic unclean leader election to be enabled without controller change
> 
>
> Key: KAFKA-6835
> URL: https://issues.apache.org/jira/browse/KAFKA-6835
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
>
> Dynamic update of broker's default unclean.leader.election.enable will be 
> processed without controller change (KAFKA-6526). We should probably do the 
> same for topic overrides as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7299) batch LeaderAndIsr requests during auto preferred leader election

2018-08-16 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7299.

   Resolution: Fixed
Fix Version/s: 2.1.0

Merged the PR to trunk.

> batch LeaderAndIsr requests during auto preferred leader election
> -
>
> Key: KAFKA-7299
> URL: https://issues.apache.org/jira/browse/KAFKA-7299
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently, in KafkaController.checkAndTriggerAutoLeaderRebalance(), we call 
> onPreferredReplicaElection() one partition at a time. This means that the 
> controller will be sending LeaderAndIsrRequest one partition at a time. It 
> would be more efficient to call onPreferredReplicaElection() for a batch of 
> partitions to reduce the number of LeaderAndIsrRequests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7299) batch LeaderAndIsr requests during auto preferred leader election

2018-08-15 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7299:
--

 Summary: batch LeaderAndIsr requests during auto preferred leader 
election
 Key: KAFKA-7299
 URL: https://issues.apache.org/jira/browse/KAFKA-7299
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 2.0.0
Reporter: Jun Rao


Currently, in KafkaController.checkAndTriggerAutoLeaderRebalance(), we call 
onPreferredReplicaElection() one partition at a time. This means that the 
controller will be sending LeaderAndIsrRequest one partition at a time. It 
would be more efficient to call onPreferredReplicaElection() for a batch of 
partitions to reduce the number of LeaderAndIsrRequests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7152) replica should be in-sync if its LEO equals leader's LEO

2018-07-21 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551864#comment-16551864
 ] 

Jun Rao commented on KAFKA-7152:


Thanks for reporting the issue. A few things.
 # Not sure if the additional check on LEO is general enough. It handles the 
case when no data is produced to a partition, but not when a partition is being 
produced slowly. It also seems to expose a new potential issue: if the replica 
fetch thread dies unexpectedly, isr won't be shrunk.
 # Currently, we don't batch the update to ISR in ZK when expanding isr in a 
follower fetch request. So it doesn't benefit from the async ZK change. We 
punted on that since it requires a bit more work. Since this is a more general 
solution, perhaps we should look into that option more seriously.
 # Taking 20 secs to do 2K ZK updates seems high. In a healthy ZK cluster, each 
ZK update should probably take just 1-2 ms. Could you improve the ZK 
performance or increase the replica lag config for this particular case?

> replica should be in-sync if its LEO equals leader's LEO
> 
>
> Key: KAFKA-7152
> URL: https://issues.apache.org/jira/browse/KAFKA-7152
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently a replica will be moved out of ISR if follower has not fetched from 
> leader for 10 sec (default replica.lag.time.max.ms). This cases problem in 
> the following scenario:
> Say follower's ReplicaFetchThread needs to fetch 2k partitions from the 
> leader broker. Only 100 out of 2k partitions are actively being produced to 
> and therefore the total bytes in rate for those 2k partitions are small. The 
> following will happen:
>  
> 1) The follower's ReplicaFetcherThread sends FetchRequest for those 2k 
> partitions.
> 2) Because the total bytes-in-rate for those 2k partitions is very small, 
> follower is able to catch up and leader broker adds these 2k partitions to 
> ISR. Follower's lastCaughtUpTimeMs for all partitions are updated to the 
> current time T0.
> 3) Since follower has caught up for all 2k partitions, leader updates 2k 
> partition znodes to include the follower in the ISR. It may take 20 seconds 
> to write 2k partition znodes if each znode write operation takes 10 ms.
> 4) At T0 + 15, maybeShrinkIsr() is invoked on leader broker. Since there is 
> no FetchRequet from the follower for more than 10 seconds after T0, all those 
> 2k partitions will be considered as out of syn and the follower will be 
> removed from ISR.
> 5) The follower receives FetchResponse at least 20 seconds after T0. That 
> means the next FetchRequest from follower to leader will be after T0 + 20.
> The sequence of events described above will loop over time. There will be 
> constant churn of URP in the cluster even if follower can catch up with 
> leader's byte-in-rate. This reduces the cluster availability.
>  
> In order to address this problem, one simple approach is to keep follower in 
> the ISR as long as follower's LEO equals leader's LEO regardless of 
> follower's lastCaughtUpTimeMs. This is particularly useful if there are a lot 
> of inactive partitions in the cluster.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7064) "Unexpected resource type GROUP" when describing broker configs using latest admin client

2018-06-19 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7064.

Resolution: Fixed

Merged [https://github.com/apache/kafka/pull/5245] to trunk and 2.0 branch.

> "Unexpected resource type GROUP" when describing broker configs using latest 
> admin client
> -
>
> Key: KAFKA-7064
> URL: https://issues.apache.org/jira/browse/KAFKA-7064
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rohan Desai
>Assignee: Andy Coates
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I'm getting the following error when I try to describe broker configs using 
> the admin client:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
> type GROUP for resource 0{code}
> I think its due to this commit: 
> [https://github.com/apache/kafka/commit/49db5a63c043b50c10c2dfd0648f8d74ee917b6a]
>  
> My guess at what's going on is that now that the client is using 
> ConfigResource instead of Resource it's sending a describe request for 
> resource type BROKER w/ id 3, while the broker associates id 3 w/ GROUP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7010) Rename ResourceNameType.ANY to MATCH

2018-06-14 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7010.

Resolution: Fixed

merged the PR to trunk and 2.0 branch.

> Rename ResourceNameType.ANY to MATCH
> 
>
> Key: KAFKA-7010
> URL: https://issues.apache.org/jira/browse/KAFKA-7010
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Following on from the PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...] and discussions with 
> Colin McCabe...
> The current ResourceNameType.ANY may be misleading as it performs pattern 
> matching for wildcard and prefixed bindings. Where as ResourceName.ANY just 
> brings back any resource name.
> Renaming to ResourceNameType.MATCH and adding more Java doc should clear this 
> up.
> Finally, ResourceNameType is no longer appropriate as the type is used in 
> ResourcePattern and ResourcePatternFilter. Hence rename to PatternType.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7007) Use JSON for /kafka-acl-extended-changes path

2018-06-12 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-7007:
---
Description: 
Relating to one of the outstanding work items in PR 
[#5117|[https://github.com/apache/kafka/pull/5117]...]

 

Keep Literal ACLs on the old paths, using the old formats, to maintain 
backwards compatibility.

Have Prefixed, and any latter types, go on new paths, using JSON, (old brokers 
are not aware of them).

Add checks to reject any adminClient requests to add prefixed acls before the 
cluster is fully upgraded.

  was:
Relating to one of the outstanding work items in PR 
[#5117|[https://github.com/apache/kafka/pull/5117]...]

 

The above PR seeing ACL change notifications come through two paths.  Change 
the code to use a single path, with a Json value that defines the 
resource-name-type of the changed binding.


> Use JSON for /kafka-acl-extended-changes path
> -
>
> Key: KAFKA-7007
> URL: https://issues.apache.org/jira/browse/KAFKA-7007
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
>  
> Keep Literal ACLs on the old paths, using the old formats, to maintain 
> backwards compatibility.
> Have Prefixed, and any latter types, go on new paths, using JSON, (old 
> brokers are not aware of them).
> Add checks to reject any adminClient requests to add prefixed acls before the 
> cluster is fully upgraded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7007) Use JSON for /kafka-acl-extended-changes path

2018-06-12 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7007.

Resolution: Fixed

merged the PR to trunk and 2.0 branch.

> Use JSON for /kafka-acl-extended-changes path
> -
>
> Key: KAFKA-7007
> URL: https://issues.apache.org/jira/browse/KAFKA-7007
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
>  
> Keep Literal ACLs on the old paths, using the old formats, to maintain 
> backwards compatibility.
> Have Prefixed, and any latter types, go on new paths, using JSON, (old 
> brokers are not aware of them).
> Add checks to reject any adminClient requests to add prefixed acls before the 
> cluster is fully upgraded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7007) Use JSON for /kafka-acl-extended-changes path

2018-06-12 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-7007:
---
Summary: Use JSON for /kafka-acl-extended-changes path  (was: All ACL 
changes should use single /kafka-acl-changes path )

> Use JSON for /kafka-acl-extended-changes path
> -
>
> Key: KAFKA-7007
> URL: https://issues.apache.org/jira/browse/KAFKA-7007
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
>  
> The above PR seeing ACL change notifications come through two paths.  Change 
> the code to use a single path, with a Json value that defines the 
> resource-name-type of the changed binding.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7005) Remove duplicate Java Resource class.

2018-06-11 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7005.

Resolution: Fixed

merged the PR to trunk and 2.0 branch.

> Remove duplicate Java Resource class.
> -
>
> Key: KAFKA-7005
> URL: https://issues.apache.org/jira/browse/KAFKA-7005
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
> The o.a.k.c.request.Resource class could be dropped in favour of 
> o.a.k.c..config.ConfigResource.
> This will remove the duplication of `Resource` classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7006) Remove duplicate Scala ResourceNameType class

2018-06-08 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7006.

Resolution: Fixed

merged the PR to trunk and 2.0 branch.

> Remove duplicate Scala ResourceNameType class
> -
>
> Key: KAFKA-7006
> URL: https://issues.apache.org/jira/browse/KAFKA-7006
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
> The kafka.security.auth.ResourceTypeName class should be dropped in favour of 
> the Java.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7011) Investigate if its possible to drop the ResourceNameType field from Java Resource class.

2018-06-07 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7011.

Resolution: Fixed

Merged the PR to trunk and 2.0 branch.

> Investigate if its possible to drop the ResourceNameType field from Java 
> Resource class.
> 
>
> Key: KAFKA-7011
> URL: https://issues.apache.org/jira/browse/KAFKA-7011
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Following on from the PR [#5117|https://github.com/apache/kafka/pull/5117] 
> and discussions with Colin McCabe...
>  
> Current placement of ResourceNameType as field in Resource class is ... less 
> than ideal. A Resource should be a concrete resource.  Look to resolve this.
>  
> Thoughts...
> A. I guess you could subclass Resource and have ResourcePrefix - but there is 
> no 'is-a' relationship here and it would still allow 
> authorise(ResourcePrefix())
> B. You could move ResourceNameType into AccessControllEntryData - possible.
> C. Move ResourceNameType directly into AclBinding / AclBindingFilter - 
> possible
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2018-05-30 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16495306#comment-16495306
 ] 

Jun Rao commented on KAFKA-3042:


Interesting. Do you have the controller and the state-change log around the 
time when the issue occurred? From the log, we want to see when the controller 
changed the zk version and whether that's propagated properly to the replicas.

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.0
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>Priority: Major
>  Labels: reliability
> Fix For: 2.0.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2018-05-29 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493976#comment-16493976
 ] 

Jun Rao commented on KAFKA-3042:


We fixed KAFKA-2729 in 1.1.0 that could lead to Cached zkVersion.

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.0
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>Priority: Major
>  Labels: reliability
> Fix For: 2.0.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6952) LogCleaner stopped due to org.apache.kafka.common.errors.CorruptRecordException

2018-05-26 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491723#comment-16491723
 ] 

Jun Rao commented on KAFKA-6952:


Hmm, could you run bin/kafka-run-class.sh kafka.tools.DumpLogSegments on those 
log segments in the offset range [0, 6919353) and see if there is any message 
corruption? Alternatively, could you attach those log segments to the jira? 
Thanks.

> LogCleaner stopped due to 
> org.apache.kafka.common.errors.CorruptRecordException
> ---
>
> Key: KAFKA-6952
> URL: https://issues.apache.org/jira/browse/KAFKA-6952
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.2
>Reporter: evan einst
>Priority: Major
>
> This error occur our prd cluster 0.10.2.1 upgrading to the 0.11.0.2 version,  
> here said(https://issues.apache.org/jira/browse/KAFKA-5431)
> that in the 0.11.0.0 version is fixed this error, we decided to upgrade to 
> 0.11.0.2 version, but after upgrading the code of a server, this problem 
> still occured.
> 
> {{[2018-05-26 13:23:58,029] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-42. (kafka.log.LogCleaner) [2018-05-26 13:23:58,029] INFO 
> Cleaner 0: Building offset map for __consumer_offsets-42... 
> (kafka.log.LogCleaner) [2018-05-26 13:23:58,050] INFO Cleaner 0: Building 
> offset map for log __consumer_offsets-42 for 19 segments in offset range [0, 
> 6919353). (kafka.log.LogCleaner) [2018-05-26 13:23:58,300] ERROR 
> [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) 
> org.apache.kafka.common.errors.CorruptRecordException: Record size is less 
> than the minimum record overhead (14) [2018-05-26 13:23:58,301] INFO 
> [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)}}
> 
> Please help me resolve this problem, thank you very much!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6937) In-sync replica delayed during fetch if replica throttle is exceeded

2018-05-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6937.

   Resolution: Fixed
Fix Version/s: 1.1.1
   1.0.2
   2.0.0

Merged the PR.

> In-sync replica delayed during fetch if replica throttle is exceeded
> 
>
> Key: KAFKA-6937
> URL: https://issues.apache.org/jira/browse/KAFKA-6937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1, 1.1.0, 1.0.1
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Major
> Fix For: 2.0.0, 1.0.2, 1.1.1
>
>
> When replication throttling is enabled, in-sync replica's traffic should 
> never be throttled. However, in DelayedFetch.tryComplete(), we incorrectly 
> delay the completion of an in-sync replica fetch request if replication 
> throttling is engaged. 
> The impact is that the producer may see increased latency if acks = all. The 
> delivery of the message to the consumer may also be delayed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6937) In-sync replica delayed during fetch if replica throttle is exceeded

2018-05-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491376#comment-16491376
 ] 

Jun Rao commented on KAFKA-6937:


[~klafferty], thanks for testing this out. Yes, the reason for including the 
in-sync replica traffic in the quota is to bound the total replication traffic. 
Otherwise, the total replication traffic will be in-sync replication traffic + 
replication quota, which can vary as out of sync replicas become in-sync.

> In-sync replica delayed during fetch if replica throttle is exceeded
> 
>
> Key: KAFKA-6937
> URL: https://issues.apache.org/jira/browse/KAFKA-6937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1, 1.1.0, 1.0.1
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Major
>
> When replication throttling is enabled, in-sync replica's traffic should 
> never be throttled. However, in DelayedFetch.tryComplete(), we incorrectly 
> delay the completion of an in-sync replica fetch request if replication 
> throttling is engaged. 
> The impact is that the producer may see increased latency if acks = all. The 
> delivery of the message to the consumer may also be delayed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6937) In-sync replica delayed during fetch if replica throttle is exceeded

2018-05-23 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-6937:
---
Component/s: core

> In-sync replica delayed during fetch if replica throttle is exceeded
> 
>
> Key: KAFKA-6937
> URL: https://issues.apache.org/jira/browse/KAFKA-6937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1, 1.1.0, 1.0.1
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Major
>
> When replication throttling is enabled, in-sync replica's traffic should 
> never be throttled. However, in DelayedFetch.tryComplete(), we incorrectly 
> delay the completion of an in-sync replica fetch request if replication 
> throttling is engaged. 
> The impact is that the producer may see increased latency if acks = all. The 
> delivery of the message to the consumer may also be delayed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6937) In-sync replica delayed during fetch if replica throttle is exceeded

2018-05-23 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-6937:
---
Affects Version/s: 0.11.0.1
   1.1.0
   1.0.1

> In-sync replica delayed during fetch if replica throttle is exceeded
> 
>
> Key: KAFKA-6937
> URL: https://issues.apache.org/jira/browse/KAFKA-6937
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1, 1.1.0, 1.0.1
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Major
>
> When replication throttling is enabled, in-sync replica's traffic should 
> never be throttled. However, in DelayedFetch.tryComplete(), we incorrectly 
> delay the completion of an in-sync replica fetch request if replication 
> throttling is engaged. 
> The impact is that the producer may see increased latency if acks = all. The 
> delivery of the message to the consumer may also be delayed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6937) In-sync replica delayed during fetch if replica throttle is exceeded

2018-05-23 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-6937:
---
Description: 
When replication throttling is enabled, in-sync replica's traffic should never 
be throttled. However, in DelayedFetch.tryComplete(), we incorrectly delay the 
completion of an in-sync replica fetch request if replication throttling is 
engaged. 

The impact is that the producer may see increased latency if acks = all. The 
delivery of the message to the consumer may also be delayed.

  was:When replication throttling is enabled, in-sync replica's traffic should 
never be throttled. However, in DelayedFetch.tryComplete(), we incorrectly 
delay the completion of an in-sync replica fetch request if replication 
throttling is engaged. 


> In-sync replica delayed during fetch if replica throttle is exceeded
> 
>
> Key: KAFKA-6937
> URL: https://issues.apache.org/jira/browse/KAFKA-6937
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Major
>
> When replication throttling is enabled, in-sync replica's traffic should 
> never be throttled. However, in DelayedFetch.tryComplete(), we incorrectly 
> delay the completion of an in-sync replica fetch request if replication 
> throttling is engaged. 
> The impact is that the producer may see increased latency if acks = all. The 
> delivery of the message to the consumer may also be delayed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6937) In-sync replica delayed during fetch if replica throttle is exceeded

2018-05-23 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6937:
--

 Summary: In-sync replica delayed during fetch if replica throttle 
is exceeded
 Key: KAFKA-6937
 URL: https://issues.apache.org/jira/browse/KAFKA-6937
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Jun Rao


When replication throttling is enabled, in-sync replica's traffic should never 
be throttled. However, in DelayedFetch.tryComplete(), we incorrectly delay the 
completion of an in-sync replica fetch request if replication throttling is 
engaged. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower

2018-05-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6361.

   Resolution: Fixed
Fix Version/s: 2.0.0

Merged the PR to trunk.

> Fast leader fail over can lead to log divergence between leader and follower
> 
>
> Key: KAFKA-6361
> URL: https://issues.apache.org/jira/browse/KAFKA-6361
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>Priority: Major
>  Labels: reliability
> Fix For: 2.0.0
>
>
> We have observed an edge case in the replication failover logic which can 
> cause a replica to permanently fall out of sync with the leader or, in the 
> worst case, actually have localized divergence between logs. This occurs in 
> spite of the improved truncation logic from KIP-101. 
> Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
> appends two batches: one in the range (0, 10) and the other in the range (11, 
> 20). The first one successfully replicates to B, but the second one does not. 
> In other words, the logs on the brokers look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> {code}
> Broker A then has a zk session expiration and broker B is elected with epoch 
> 2. It appends a new batch with offsets (11, n) to its local log. So we now 
> have this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Normally we expect broker A to truncate to offset 11 on becoming the 
> follower, but before it is able to do so, broker B has its own zk session 
> expiration and broker A again becomes leader, now with epoch 3. It then 
> appends a new entry in the range (21, 30). The updated logs look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> 2: offsets: [21, 30], leader epoch: 3
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Now what happens next depends on the last offset of the batch appended in 
> epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch 
> request to broker A with epoch 2. Broker A will respond that epoch 2 ends at 
> offset 21. There are three cases:
> 1) n < 20: In this case, broker B will not do any truncation. It will begin 
> fetching from offset n, which will ultimately cause an out of order offset 
> error because broker A will return the full batch beginning from offset 11 
> which broker B will be unable to append.
> 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 
> and everything will appear fine though the logs have actually diverged.
> 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in 
> the middle of the batch, it will truncate all the way to offset 10. It can 
> begin fetching from offset 11 and everything is fine.
> The case we have actually seen is the first one. The second one would likely 
> go unnoticed in practice and everything is fine in the third case. To 
> workaround the issue, we deleted the active segment on the replica which 
> allowed it to re-replicate consistently from the leader.
> I'm not sure the best solution for this scenario. Maybe if the leader isn't 
> aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
> instead of using the offset of the next highest epoch. That would cause the 
> follower to truncate using its high watermark. Or perhaps instead of doing 
> so, it could send another OffsetForLeaderEpoch request at the next previous 
> cached epoch and then truncate using that. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6858) Log.truncateTo() may truncate to an earlier offset than requested

2018-05-03 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6858:
--

 Summary: Log.truncateTo() may truncate to an earlier offset than 
requested
 Key: KAFKA-6858
 URL: https://issues.apache.org/jira/browse/KAFKA-6858
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


In Log.truncateTo(), if the truncation point is in the middle of a message set, 
we will actually be truncating to the first offset of the message set. In that 
case, the replica fetcher thread should adjust the fetch offset to the actual 
truncated offset. Typically, the truncation point should never be in the middle 
of a message set. However, this could potentially happen during message format 
upgrade.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6857) LeaderEpochFileCache.endOffsetFor() should check for UNDEFINED_EPOCH explicitly

2018-05-03 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6857:
--

 Summary: LeaderEpochFileCache.endOffsetFor() should check for 
UNDEFINED_EPOCH explicitly
 Key: KAFKA-6857
 URL: https://issues.apache.org/jira/browse/KAFKA-6857
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.0
Reporter: Jun Rao


In LeaderEpochFileCache.endOffsetFor() , we have the following code.

 

 
{code:java}
if (requestedEpoch == latestEpoch) {
 leo().messageOffset
{code}
 

In the case when the requestedEpoch is UNDEFINED_EPOCH and latestEpoch is also 
UNDEFINED_EPOCH, we return leo. This will cause the follower to truncate to a 
wrong offset. If requestedEpoch is UNDEFINED_EPOCH, we need to request 
UNDEFINED_EPOCH_OFFSET.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2018-04-27 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457130#comment-16457130
 ] 

Jun Rao commented on KAFKA-6834:


To fix this, we need to handle the cleaner buffer to grow up to the size of a 
single message set in the log.

> log cleaner should handle the case when the size of a message set is larger 
> than the max message size
> -
>
> Key: KAFKA-6834
> URL: https://issues.apache.org/jira/browse/KAFKA-6834
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> In KAFKA-5316, we added the logic to allow a message (set) larger than the 
> per topic message size to be written to the log during log cleaning. However, 
> the buffer size in the log cleaner is still bounded by the per topic message 
> size. This can cause the log cleaner to die and cause the broker to run out 
> of disk space.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2018-04-27 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6834:
--

 Summary: log cleaner should handle the case when the size of a 
message set is larger than the max message size
 Key: KAFKA-6834
 URL: https://issues.apache.org/jira/browse/KAFKA-6834
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao


In KAFKA-5316, we added the logic to allow a message (set) larger than the per 
topic message size to be written to the log during log cleaning. However, the 
buffer size in the log cleaner is still bounded by the per topic message size. 
This can cause the log cleaner to die and cause the broker to run out of disk 
space.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6650) The controller should be able to handle a partially deleted topic

2018-04-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6650.

   Resolution: Fixed
Fix Version/s: 2.0.0

merged the PR to trunk.

> The controller should be able to handle a partially deleted topic
> -
>
> Key: KAFKA-6650
> URL: https://issues.apache.org/jira/browse/KAFKA-6650
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.0.0
>
>
> A previous controller could have deleted some partitions of a topic from ZK, 
> but not all partitions, and then died.
> In that case, the new controller should be able to handle the partially 
> deleted topic, and finish the deletion.
> In the current code base, if there is no leadership info for a replica's 
> partition, the transition to OfflineReplica state for the replica will fail. 
> Afterwards the transition to ReplicaDeletionStarted will fail as well since 
> the only valid previous state for ReplicaDeletionStarted is OfflineReplica. 
> Furthermore, it means the topic deletion will never finish.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark

2018-04-11 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16434646#comment-16434646
 ] 

Jun Rao commented on KAFKA-6780:


The fix is probably to further bound firstUncleanableDirtyOffset by the high 
watermark.

> log cleaner shouldn't clean messages beyond high watermark
> --
>
> Key: KAFKA-6780
> URL: https://issues.apache.org/jira/browse/KAFKA-6780
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Priority: Major
>
> Currently, the firstUncleanableDirtyOffset computed by the log cleaner is 
> bounded by the first offset in the active segment. It's possible for the high 
> watermark to be smaller than that. This may cause a committed record to be 
> removed because of an uncommitted record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark

2018-04-11 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6780:
--

 Summary: log cleaner shouldn't clean messages beyond high watermark
 Key: KAFKA-6780
 URL: https://issues.apache.org/jira/browse/KAFKA-6780
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Jun Rao


Currently, the firstUncleanableDirtyOffset computed by the log cleaner is 
bounded by the first offset in the active segment. It's possible for the high 
watermark to be smaller than that. This may cause a committed record to be 
removed because of an uncommitted record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6447) Add Delegation Token Operations to KafkaAdminClient

2018-04-11 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6447.

   Resolution: Fixed
Fix Version/s: 2.0.0

Merged the PR to trunk.

> Add Delegation Token Operations to KafkaAdminClient
> ---
>
> Key: KAFKA-6447
> URL: https://issues.apache.org/jira/browse/KAFKA-6447
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0
>
>
> This JIRA is about adding delegation token operations to the new Admin Client 
> API.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-249%3A+Add+Delegation+Token+Operations+to+KafkaAdminClient



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6752) Unclean leader election metric no longer working

2018-04-11 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6752.

   Resolution: Fixed
Fix Version/s: 1.1.1
   2.0.0

Merged to trunk and 1.1.

> Unclean leader election metric no longer working
> 
>
> Key: KAFKA-6752
> URL: https://issues.apache.org/jira/browse/KAFKA-6752
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.1.0
>Reporter: Jason Gustafson
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> Happened to notice that the unclean leader election meter is no longer being 
> updated. This was probably lost during the controller overhaul.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5706) log the name of the error instead of the error code in response objects

2018-04-10 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-5706.

Resolution: Cannot Reproduce

[~manasvigupta], this doesn't seem to be an issue any more now that we have 
converted all requests/responses to java objects using Error enum. Closing the 
jira.

> log the name of the error instead of the error code in response objects
> ---
>
> Key: KAFKA-5706
> URL: https://issues.apache.org/jira/browse/KAFKA-5706
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.11.0.0
>Reporter: Jun Rao
>Assignee: Manasvi Gupta
>Priority: Major
>  Labels: newbie
>
> Currently, when logging the error code in the response objects, we simply log 
> response.toString(), which contains the error code. It will be useful to log 
> the name of the corresponding exception for the error, which is more 
> meaningful than an error code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3827) log.message.format.version should default to inter.broker.protocol.version

2018-04-10 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-3827.

Resolution: Won't Fix

The currently implementation is easier to understand than what I proposed. 
Closing the jira.

> log.message.format.version should default to inter.broker.protocol.version
> --
>
> Key: KAFKA-3827
> URL: https://issues.apache.org/jira/browse/KAFKA-3827
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Manasvi Gupta
>Priority: Major
>  Labels: newbie
>
> Currently, if one sets inter.broker.protocol.version to 0.9.0 and restarts 
> the broker, one will get the following exception since 
> log.message.format.version defaults to 0.10.0. It will be more intuitive if 
> log.message.format.version defaults to the value of 
> inter.broker.protocol.version.
> java.lang.IllegalArgumentException: requirement failed: 
> log.message.format.version 0.10.0-IV1 cannot be used when 
> inter.broker.protocol.version is set to 0.9.0.1
>   at scala.Predef$.require(Predef.scala:233)
>   at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1023)
>   at kafka.server.KafkaConfig.(KafkaConfig.scala:994)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740)
>   at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
>   at kafka.Kafka$.main(Kafka.scala:58)
>   at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6760) responses not logged properly in controller

2018-04-06 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6760:
--

 Summary: responses not logged properly in controller
 Key: KAFKA-6760
 URL: https://issues.apache.org/jira/browse/KAFKA-6760
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 1.1.0
Reporter: Jun Rao


Saw the following logging in controller.log. We need to log the 
StopReplicaResponse properly in KafkaController.

[2018-04-05 14:38:41,878] DEBUG [Controller id=0] Delete topic callback invoked 
for org.apache.kafka.common.requests.StopReplicaResponse@263d40c 
(kafka.controller.K

afkaController)

It seems that the same issue exists for LeaderAndIsrResponse as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6624) log segment deletion could cause a disk to be marked offline incorrectly

2018-03-12 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6624.

   Resolution: Fixed
Fix Version/s: 1.1.0

> log segment deletion could cause a disk to be marked offline incorrectly
> 
>
> Key: KAFKA-6624
> URL: https://issues.apache.org/jira/browse/KAFKA-6624
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.0
>
>
> Saw the following log.
> [2018-03-06 23:12:20,721] ERROR Error while flushing log for topic1-0 in dir 
> /data01/kafka-logs with offset 80993 (kafka.server.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>         at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:379)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:163)
>         at 
> kafka.log.LogSegment$$anonfun$flush$1.apply$mcV$sp(LogSegment.scala:375)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at kafka.log.LogSegment.flush(LogSegment.scala:374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1373)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.log.Log$$anonfun$flush$1.apply$mcV$sp(Log.scala:1373)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.flush(Log.scala:1368)
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$1.apply$mcV$sp(Log.scala:1343)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>         at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2018-03-06 23:12:20,722] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /data01/kafka-logs (kafka.server.ReplicaManager)
> It seems that topic1 was being deleted around the time when flushing was 
> called. Then flushing hit an IOException, which caused the disk to be marked 
> offline incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6624) log segment deletion could cause a disk to be marked offline incorrectly

2018-03-07 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390727#comment-16390727
 ] 

Jun Rao commented on KAFKA-6624:


[~lindong], do you think this is an issue? Thanks.

> log segment deletion could cause a disk to be marked offline incorrectly
> 
>
> Key: KAFKA-6624
> URL: https://issues.apache.org/jira/browse/KAFKA-6624
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Priority: Major
>
> Saw the following log.
> [2018-03-06 23:12:20,721] ERROR Error while flushing log for topic1-0 in dir 
> /data01/kafka-logs with offset 80993 (kafka.server.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>         at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:379)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:163)
>         at 
> kafka.log.LogSegment$$anonfun$flush$1.apply$mcV$sp(LogSegment.scala:375)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at kafka.log.LogSegment.flush(LogSegment.scala:374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1373)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.log.Log$$anonfun$flush$1.apply$mcV$sp(Log.scala:1373)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.flush(Log.scala:1368)
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$1.apply$mcV$sp(Log.scala:1343)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>         at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2018-03-06 23:12:20,722] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /data01/kafka-logs (kafka.server.ReplicaManager)
> It seems that topic1 was being deleted around the time when flushing was 
> called. Then flushing hit an IOException, which caused the disk to be marked 
> offline incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6624) log segment deletion could cause a disk to be marked offline incorrectly

2018-03-07 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390572#comment-16390572
 ] 

Jun Rao commented on KAFKA-6624:


One way to fix this is to add a flag for a log segment to be deleted. Then, if 
we hit an IOException during flushing, we could check if the deletion flag of 
the segment is set and if so, just ignore the IOException.

> log segment deletion could cause a disk to be marked offline incorrectly
> 
>
> Key: KAFKA-6624
> URL: https://issues.apache.org/jira/browse/KAFKA-6624
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Priority: Major
>
> Saw the following log.
> [2018-03-06 23:12:20,721] ERROR Error while flushing log for topic1-0 in dir 
> /data01/kafka-logs with offset 80993 (kafka.server.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>         at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:379)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:163)
>         at 
> kafka.log.LogSegment$$anonfun$flush$1.apply$mcV$sp(LogSegment.scala:375)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at kafka.log.LogSegment.flush(LogSegment.scala:374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1373)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.log.Log$$anonfun$flush$1.apply$mcV$sp(Log.scala:1373)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.flush(Log.scala:1368)
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$1.apply$mcV$sp(Log.scala:1343)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>         at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2018-03-06 23:12:20,722] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /data01/kafka-logs (kafka.server.ReplicaManager)
> It seems that topic1 was being deleted around the time when flushing was 
> called. Then flushing hit an IOException, which caused the disk to be marked 
> offline incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6624) log segment deletion could cause a disk to be marked offline incorrectly

2018-03-07 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6624:
--

 Summary: log segment deletion could cause a disk to be marked 
offline incorrectly
 Key: KAFKA-6624
 URL: https://issues.apache.org/jira/browse/KAFKA-6624
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.0
Reporter: Jun Rao


Saw the following log.

[2018-03-06 23:12:20,721] ERROR Error while flushing log for topic1-0 in dir 
/data01/kafka-logs with offset 80993 (kafka.server.LogDirFailureChannel)

java.nio.channels.ClosedChannelException

        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)

        at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:379)

        at 
org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:163)

        at 
kafka.log.LogSegment$$anonfun$flush$1.apply$mcV$sp(LogSegment.scala:375)

        at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)

        at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)

        at kafka.log.LogSegment.flush(LogSegment.scala:374)

        at 
kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1374)

        at 
kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1373)

        at scala.collection.Iterator$class.foreach(Iterator.scala:891)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

        at kafka.log.Log$$anonfun$flush$1.apply$mcV$sp(Log.scala:1373)

        at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)

        at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)

        at kafka.log.Log.maybeHandleIOException(Log.scala:1669)

        at kafka.log.Log.flush(Log.scala:1368)

        at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$1.apply$mcV$sp(Log.scala:1343)

        at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)

        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

[2018-03-06 23:12:20,722] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir /data01/kafka-logs (kafka.server.ReplicaManager)

It seems that topic1 was being deleted around the time when flushing was 
called. Then flushing hit an IOException, which caused the disk to be marked 
offline incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup

2018-03-05 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387125#comment-16387125
 ] 

Jun Rao commented on KAFKA-3978:


Agreed. For 1) we could add an assertion. For 2), if we detect that we are 
truncating to -1, we could log a warn and just truncate the whole local log.

> Cannot truncate to a negative offset (-1) exception at broker startup
> -
>
> Key: KAFKA-3978
> URL: https://issues.apache.org/jira/browse/KAFKA-3978
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
> Environment: 3.13.0-87-generic 
>Reporter: Juho Mäkinen
>Priority: Critical
>  Labels: reliability, startup
>
> During broker startup sequence the broker server.log has this exception. 
> Problem persists after multiple restarts and also on another broker in the 
> cluster.
> {code}
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> INFO Kafka commitId : b8642491e78c5a13 
> (org.apache.kafka.common.utils.AppInfoParser)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> Error when handling request 
> {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT 
> SNIPPED AWAY..], 
> live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]}
>  (kafka.server.KafkaApis)
> ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative 
> offset (-1).
> at kafka.log.Log.truncateTo(Log.scala:731)
> at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288)
> at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> at 
> 

[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2018-02-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366546#comment-16366546
 ] 

Jun Rao commented on KAFKA-4277:


[~nico.meyer], do you know if the conflicting ephemeral node was there 
temporarily or permanently? From the ZK server commit log, was there any 
indication that the ephemeral node was removed after session expiration, and if 
so, when?

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>Priority: Major
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6549) Deadlock while processing Controller Events

2018-02-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364973#comment-16364973
 ] 

Jun Rao commented on KAFKA-6549:


We can fix this issue with the updated version in 
https://github.com/apache/kafka/pull/4551

> Deadlock while processing Controller Events
> ---
>
> Key: KAFKA-6549
> URL: https://issues.apache.org/jira/browse/KAFKA-6549
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: td.txt
>
>
> Stack traces from a single node test cluster that was deadlocked while 
> processing controller Reelect and Expire events. Attached stack-trace.
> {quote}
> "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 
> nid=0x7d03 waiting on condition [0x7278b000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bccadf30> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363)
>  at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>  at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
> Locked ownable synchronizers:
>  - <0x000780054860> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>  
> "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 
> nid=0xad03 waiting on condition [0x73fd3000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bcc584a0> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439)
>  at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432)
>  at 
> kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171)
>  at 
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
>  at 
> 

[jira] [Resolved] (KAFKA-6184) report a metric of the lag between the consumer offset and the start offset of the log

2018-02-06 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6184.

   Resolution: Fixed
Fix Version/s: 1.2.0

The PR is merged to trunk.

> report a metric of the lag between the consumer offset and the start offset 
> of the log
> --
>
> Key: KAFKA-6184
> URL: https://issues.apache.org/jira/browse/KAFKA-6184
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Currently, the consumer reports a metric of the lag between the high 
> watermark of a log and the consumer offset. It will be useful to report a 
> similar lag metric between the consumer offset and the start offset of the 
> log. If this latter lag gets close to 0, it's an indication that the consumer 
> may lose data soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6452) Add documentation for delegation token authentication mechanism

2018-02-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6452.

   Resolution: Fixed
Fix Version/s: (was: 1.2.0)
   1.1.0

The PR is merged to 1.1 and trunk.

> Add documentation for delegation token authentication mechanism
> ---
>
> Key: KAFKA-6452
> URL: https://issues.apache.org/jira/browse/KAFKA-6452
> Project: Kafka
>  Issue Type: Sub-task
>  Components: documentation
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6254) Introduce Incremental FetchRequests to Increase Partition Scalability

2018-02-05 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352719#comment-16352719
 ] 

Jun Rao commented on KAFKA-6254:


Some perf results from [~afalko]. 

Without patch @46k partitions: consume latency- 4 ms; commit latency- 46 ms
With patch [@68K|https://github.com/68k]: consume - 2 ms; commit - 33 ms
With patch @46k: consume - 2 ms; commit - 21 ms

The improvement of commit is due to improvement in replicating the offset topic.

> Introduce Incremental FetchRequests to Increase Partition Scalability
> -
>
> Key: KAFKA-6254
> URL: https://issues.apache.org/jira/browse/KAFKA-6254
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
> Fix For: 1.1.0
>
>
> Introduce Incremental FetchRequests to Increase Partition Scalability.  See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6254) Introduce Incremental FetchRequests to Increase Partition Scalability

2018-02-05 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6254.

   Resolution: Fixed
Fix Version/s: 1.1.0

The PR is merged.

> Introduce Incremental FetchRequests to Increase Partition Scalability
> -
>
> Key: KAFKA-6254
> URL: https://issues.apache.org/jira/browse/KAFKA-6254
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
> Fix For: 1.1.0
>
>
> Introduce Incremental FetchRequests to Increase Partition Scalability.  See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-02-05 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352667#comment-16352667
 ] 

Jun Rao commented on KAFKA-6469:


[~ambroff], thanks for reporting this. Is the # of children in the isr_change 
path a problem? Currently, each broker batches the isr changes when writing to 
ZK. So, the number of child nodes under isr_change should be proportional to 
the # brokers. If one follows the best practice by waiting for all replicas to 
be in sync before restarting the next broker, there should only be one child 
node in the isr_change typically.

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've failures in one of our test clusters as the partition count started to 
> climb north of 60k per broker. We had brokers writing child nodes under 
> /isr_change_notification that were larger than the jute.maxbuffer size in 
> ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2018-01-26 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341863#comment-16341863
 ] 

Jun Rao commented on KAFKA-2729:


The problem that we fixed related to this jira is KAFKA-5642. Previously, when 
the controller's ZK session expires and loses its controller-ship, it's 
possible for this zombie controller to continue updating ZK and/or sending 
LeaderAndIsrRequests to the brokers for a short period of time. When this 
happens, the broker may not have the most up-to-date information about leader 
and isr, which can lead to subsequent ZK failure when isr needs to be updated. 
KAFKA-5642 fixes the issue by handling the ZK session expiration event properly.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Major
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)

2018-01-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340295#comment-16340295
 ] 

Jun Rao commented on KAFKA-5886:


https://github.com/apache/kafka/pull/3849

> Introduce delivery.timeout.ms producer config (KIP-91)
> --
>
> Key: KAFKA-5886
> URL: https://issues.apache.org/jira/browse/KAFKA-5886
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
> Fix For: 1.1.0
>
>
> We propose adding a new timeout delivery.timeout.ms. The window of 
> enforcement includes batching in the accumulator, retries, and the inflight 
> segments of the batch. With this config, the user has a guaranteed upper 
> bound on when a record will either get sent, fail or expire from the point 
> when send returns. In other words we no longer overload request.timeout.ms to 
> act as a weak proxy for accumulator timeout and instead introduce an explicit 
> timeout that users can rely on without exposing any internals of the producer 
> such as the accumulator. 
> See 
> [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]
>  for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6254) Introduce Incremental FetchRequests to Increase Partition Scalability

2018-01-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339830#comment-16339830
 ] 

Jun Rao commented on KAFKA-6254:


https://github.com/apache/kafka/pull/4418

> Introduce Incremental FetchRequests to Increase Partition Scalability
> -
>
> Key: KAFKA-6254
> URL: https://issues.apache.org/jira/browse/KAFKA-6254
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Introduce Incremental FetchRequests to Increase Partition Scalability.  See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2017-12-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293473#comment-16293473
 ] 

Jun Rao commented on KAFKA-3410:


[~wushujames], yes, KAFKA-1211 should help address this. Do you want to try 
your set up again now that KAFKA-1211 is fixed?

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower

2017-12-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291812#comment-16291812
 ] 

Jun Rao commented on KAFKA-6361:


[~hachikuji], thanks for the info. The problem in the description can indeed 
happen if the first leader change is due to preferred leader election, in which 
case, the ISR won't change.

To address this issue, we could send another OffsetForLeaderEpoch with the 
previous leader epoch as you suggested. This may require multiple rounds of 
OffsetForLeaderEpoch requests. Another way is to change OffsetForLeaderEpoch 
request to send a sequence of (leader epoch, start offset) for the epoch 
between the follower's HW and LEO. On the leader side, we find the longest 
consecutive sequence of leader epoch whose start offset matches the leader's. 
We then return the end offset of the last matching leader epoch.

The above approach doesn't fully fix the issue for a compacted topic. When all 
messages for a leader epoch are removed, we may lose the leader epoch. Thus, 
the leader epochs between the follower and the leader may not perfectly match. 
One way to address this issue is to preserve the offset of the first message in 
a leader epoch during log cleaning. This probably can be done separately since 
it causes problems rarely.

> Fast leader fail over can lead to log divergence between leader and follower
> 
>
> Key: KAFKA-6361
> URL: https://issues.apache.org/jira/browse/KAFKA-6361
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>  Labels: reliability
>
> We have observed an edge case in the replication failover logic which can 
> cause a replica to permanently fall out of sync with the leader or, in the 
> worst case, actually have localized divergence between logs. This occurs in 
> spite of the improved truncation logic from KIP-101. 
> Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
> appends two batches: one in the range (0, 10) and the other in the range (11, 
> 20). The first one successfully replicates to B, but the second one does not. 
> In other words, the logs on the brokers look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> {code}
> Broker A then has a zk session expiration and broker B is elected with epoch 
> 2. It appends a new batch with offsets (11, n) to its local log. So we now 
> have this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Normally we expect broker A to truncate to offset 11 on becoming the 
> follower, but before it is able to do so, broker B has its own zk session 
> expiration and broker A again becomes leader, now with epoch 3. It then 
> appends a new entry in the range (21, 30). The updated logs look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> 2: offsets: [21, 30], leader epoch: 3
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Now what happens next depends on the last offset of the batch appended in 
> epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch 
> request to broker A with epoch 2. Broker A will respond that epoch 2 ends at 
> offset 21. There are three cases:
> 1) n < 20: In this case, broker B will not do any truncation. It will begin 
> fetching from offset n, which will ultimately cause an out of order offset 
> error because broker A will return the full batch beginning from offset 11 
> which broker B will be unable to append.
> 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 
> and everything will appear fine though the logs have actually diverged.
> 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in 
> the middle of the batch, it will truncate all the way to offset 10. It can 
> begin fetching from offset 11 and everything is fine.
> The case we have actually seen is the first one. The second one would likely 
> go unnoticed in practice and everything is fine in the third case. To 
> workaround the issue, we deleted the active segment on the replica which 
> allowed it to re-replicate consistently from the leader.
> I'm not sure the best solution for this scenario. Maybe if the leader isn't 
> aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
> instead of using the offset of the next highest epoch. That would cause the 
> follower to truncate using its high watermark. Or perhaps instead of doing 
> so, it could send another 

<    1   2   3   4   5   >