[jira] [Updated] (KAFKA-6146) minimize the number of triggers enqueuing PreferredReplicaLeaderElection events

2017-11-07 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-6146:

Summary: minimize the number of triggers enqueuing 
PreferredReplicaLeaderElection events  (was: re-register the exist watch on 
PreferredReplicaElectionZNode only after the preferred leader election 
completes )

> minimize the number of triggers enqueuing PreferredReplicaLeaderElection 
> events
> ---
>
> Key: KAFKA-6146
> URL: https://issues.apache.org/jira/browse/KAFKA-6146
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 1.1.0
>Reporter: Jun Rao
> Fix For: 1.1.0
>
>
> We currently enqueue a PreferredReplicaLeaderElection controller event in 
> PreferredReplicaElectionHandler's handleCreation, handleDeletion, and 
> handleDataChange. We really only need to enqueue the event and re-register 
> the exist watch on PreferredReplicaElectionZNode after preferred replica 
> leader election completes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6146) re-register the exist watch on PreferredReplicaElectionZNode only after the preferred leader election completes

2017-11-07 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-6146:

Summary: re-register the exist watch on PreferredReplicaElectionZNode only 
after the preferred leader election completes   (was: re-register the exist 
watch on PreferredReplicaElectionZNode after the preferred leader election 
completes )

> re-register the exist watch on PreferredReplicaElectionZNode only after the 
> preferred leader election completes 
> 
>
> Key: KAFKA-6146
> URL: https://issues.apache.org/jira/browse/KAFKA-6146
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 1.1.0
>Reporter: Jun Rao
> Fix For: 1.1.0
>
>
> We currently enqueue a PreferredReplicaLeaderElection controller event in 
> PreferredReplicaElectionHandler's handleCreation, handleDeletion, and 
> handleDataChange. We really only need to enqueue the event and re-register 
> the exist watch on PreferredReplicaElectionZNode after preferred replica 
> leader election completes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6146) minimize the number of triggers enqueuing PreferredReplicaLeaderElection events

2017-11-07 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman reassigned KAFKA-6146:
---

Assignee: Onur Karaman

> minimize the number of triggers enqueuing PreferredReplicaLeaderElection 
> events
> ---
>
> Key: KAFKA-6146
> URL: https://issues.apache.org/jira/browse/KAFKA-6146
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Onur Karaman
> Fix For: 1.1.0
>
>
> We currently enqueue a PreferredReplicaLeaderElection controller event in 
> PreferredReplicaElectionHandler's handleCreation, handleDeletion, and 
> handleDataChange. We can just enqueue the event upon znode creation and after 
> preferred replica leader election completes. The processing of this latter 
> enqueue will register the exist watch on PreferredReplicaElectionZNode and 
> perform any pending preferred replica leader election that may have occurred 
> between completion and registration.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6146) minimize the number of triggers enqueuing PreferredReplicaLeaderElection events

2017-11-07 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-6146:

Description: We currently enqueue a PreferredReplicaLeaderElection 
controller event in PreferredReplicaElectionHandler's handleCreation, 
handleDeletion, and handleDataChange. We can just enqueue the event upon znode 
creation and after preferred replica leader election completes. The processing 
of this latter enqueue will register the exist watch on 
PreferredReplicaElectionZNode and perform any pending preferred replica leader 
election that may have occurred between completion and registration.  (was: We 
currently enqueue a PreferredReplicaLeaderElection controller event in 
PreferredReplicaElectionHandler's handleCreation, handleDeletion, and 
handleDataChange. We really only need to enqueue the event and re-register the 
exist watch on PreferredReplicaElectionZNode after preferred replica leader 
election completes.)

> minimize the number of triggers enqueuing PreferredReplicaLeaderElection 
> events
> ---
>
> Key: KAFKA-6146
> URL: https://issues.apache.org/jira/browse/KAFKA-6146
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 1.1.0
>Reporter: Jun Rao
> Fix For: 1.1.0
>
>
> We currently enqueue a PreferredReplicaLeaderElection controller event in 
> PreferredReplicaElectionHandler's handleCreation, handleDeletion, and 
> handleDataChange. We can just enqueue the event upon znode creation and after 
> preferred replica leader election completes. The processing of this latter 
> enqueue will register the exist watch on PreferredReplicaElectionZNode and 
> perform any pending preferred replica leader election that may have occurred 
> between completion and registration.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6146) re-register the exist watch on PreferredReplicaElectionZNode after the preferred leader election completes

2017-11-07 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-6146:

Description: We currently enqueue a PreferredReplicaLeaderElection 
controller event in PreferredReplicaElectionHandler's handleCreation, 
handleDeletion, and handleDataChange. We really only need to enqueue the event 
and re-register the exist watch on PreferredReplicaElectionZNode after 
preferred replica leader election completes.  (was: We currently enqueue a 
PreferredReplicaLeaderElection controller event in 
PreferredReplicaElectionHandler's handleCreation, handleDeletion, and 
handleDataChange. We really only need to enqueue the event and re-register the 
exist watch on after preferred replica leader election completes.)

> re-register the exist watch on PreferredReplicaElectionZNode after the 
> preferred leader election completes 
> ---
>
> Key: KAFKA-6146
> URL: https://issues.apache.org/jira/browse/KAFKA-6146
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 1.1.0
>Reporter: Jun Rao
> Fix For: 1.1.0
>
>
> We currently enqueue a PreferredReplicaLeaderElection controller event in 
> PreferredReplicaElectionHandler's handleCreation, handleDeletion, and 
> handleDataChange. We really only need to enqueue the event and re-register 
> the exist watch on PreferredReplicaElectionZNode after preferred replica 
> leader election completes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5894) add the notion of max inflight requests to async ZookeeperClient

2017-11-01 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16234885#comment-16234885
 ] 

Onur Karaman commented on KAFKA-5894:
-

This went through 
[KIP-214|https://cwiki.apache.org/confluence/display/KAFKA/KIP-214%3A+Add+zookeeper.max.in.flight.requests+config+to+the+broker]
 and the KIP has been accepted.

> add the notion of max inflight requests to async ZookeeperClient
> 
>
> Key: KAFKA-5894
> URL: https://issues.apache.org/jira/browse/KAFKA-5894
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Major
> Fix For: 1.1.0
>
>
> ZookeeperClient is a zookeeper client that encourages pipelined requests to 
> zookeeper. We want to add the notion of max inflight requests to the client 
> for several reasons:
> # to bound memory overhead associated with async requests on the client.
> # to not overwhelm the zookeeper ensemble with a burst of requests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6134) High memory usage on controller during partition reassignment

2017-10-26 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221443#comment-16221443
 ] 

Onur Karaman edited comment on KAFKA-6134 at 10/26/17 11:28 PM:


If you want to port a fix to 1.0 without pulling in all of KAFKA-5642, I think 
you can just lazily read the reassignment state upon actually processing the 
PartitionReassignment instead of providing one as part of the 
PartitionReassignment instance so that you'd only have one partition 
reassignment mapping allocated at any point in time.


was (Author: onurkaraman):
If you want to port a fix to 1.0 without pulling in all of KAFKA-5642, I think 
you can just lazily read the reassignment state upon actually processing the 
PartitionReassignment so that you'd only have one partition reassignment 
mapping allocated at any point in time.

> High memory usage on controller during partition reassignment
> -
>
> Key: KAFKA-6134
> URL: https://issues.apache.org/jira/browse/KAFKA-6134
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Jason Gustafson
>Priority: Critical
> Attachments: Screen Shot 2017-10-26 at 3.05.40 PM.png
>
>
> We've had a couple users reporting spikes in memory usage when the controller 
> is performing partition reassignment in 0.11. After investigation, we found 
> that the controller event queue was using most of the retained memory. In 
> particular, we found several thousand {{PartitionReassignment}} objects, each 
> one containing one fewer partition than the previous one (see the attached 
> image).
> From the code, it seems clear why this is happening. We have a watch on the 
> partition reassignment path which adds the {{PartitionReassignment}} object 
> to the event queue:
> {code}
>   override def handleDataChange(dataPath: String, data: Any): Unit = {
> val partitionReassignment = 
> ZkUtils.parsePartitionReassignmentData(data.toString)
> eventManager.put(controller.PartitionReassignment(partitionReassignment))
>   }
> {code}
> In the {{PartitionReassignment}} event handler, we iterate through all of the 
> partitions in the reassignment. After we complete reassignment for each 
> partition, we remove that partition and update the node in zookeeper. 
> {code}
> // remove this partition from that list
> val updatedPartitionsBeingReassigned = partitionsBeingReassigned - 
> topicAndPartition
> // write the new list to zookeeper
>   
> zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
> {code}
> This triggers the handler above which adds a new event in the queue. So what 
> you get is an n^2 increase in memory where n is the number of partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6134) High memory usage on controller during partition reassignment

2017-10-26 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221443#comment-16221443
 ] 

Onur Karaman commented on KAFKA-6134:
-

If you want to port a fix to 1.0 without pulling in all of KAFKA-5642, I think 
you can just lazily read the reassignment state upon actually processing the 
PartitionReassignment so that you'd only have one partition reassignment 
mapping allocated at any point in time.

> High memory usage on controller during partition reassignment
> -
>
> Key: KAFKA-6134
> URL: https://issues.apache.org/jira/browse/KAFKA-6134
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Jason Gustafson
>Priority: Critical
> Attachments: Screen Shot 2017-10-26 at 3.05.40 PM.png
>
>
> We've had a couple users reporting spikes in memory usage when the controller 
> is performing partition reassignment in 0.11. After investigation, we found 
> that the controller event queue was using most of the retained memory. In 
> particular, we found several thousand {{PartitionReassignment}} objects, each 
> one containing one fewer partition than the previous one (see the attached 
> image).
> From the code, it seems clear why this is happening. We have a watch on the 
> partition reassignment path which adds the {{PartitionReassignment}} object 
> to the event queue:
> {code}
>   override def handleDataChange(dataPath: String, data: Any): Unit = {
> val partitionReassignment = 
> ZkUtils.parsePartitionReassignmentData(data.toString)
> eventManager.put(controller.PartitionReassignment(partitionReassignment))
>   }
> {code}
> In the {{PartitionReassignment}} event handler, we iterate through all of the 
> partitions in the reassignment. After we complete reassignment for each 
> partition, we remove that partition and update the node in zookeeper. 
> {code}
> // remove this partition from that list
> val updatedPartitionsBeingReassigned = partitionsBeingReassigned - 
> topicAndPartition
> // write the new list to zookeeper
>   
> zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
> {code}
> This triggers the handler above which adds a new event in the queue. So what 
> you get is an n^2 increase in memory where n is the number of partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6134) High memory usage on controller during partition reassignment

2017-10-26 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221392#comment-16221392
 ] 

Onur Karaman commented on KAFKA-6134:
-

Yes I had noticed the O(N^2) behavior a while ago. I believe this should be 
mitigated after KAFKA-5642 since PartitionReassignment is now a case object 
instead of a case class containing the remaining reassignment mapping.

> High memory usage on controller during partition reassignment
> -
>
> Key: KAFKA-6134
> URL: https://issues.apache.org/jira/browse/KAFKA-6134
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Jason Gustafson
>Priority: Critical
> Attachments: Screen Shot 2017-10-26 at 3.05.40 PM.png
>
>
> We've had a couple users reporting spikes in memory usage when the controller 
> is performing partition reassignment in 0.11. After investigation, we found 
> that the controller event queue was using most of the retained memory. In 
> particular, we found several thousand {{PartitionReassignment}} objects, each 
> one containing one fewer partition than the previous one (see the attached 
> image).
> From the code, it seems clear why this is happening. We have a watch on the 
> partition reassignment path which adds the {{PartitionReassignment}} object 
> to the event queue:
> {code}
>   override def handleDataChange(dataPath: String, data: Any): Unit = {
> val partitionReassignment = 
> ZkUtils.parsePartitionReassignmentData(data.toString)
> eventManager.put(controller.PartitionReassignment(partitionReassignment))
>   }
> {code}
> In the {{PartitionReassignment}} event handler, we iterate through all of the 
> partitions in the reassignment. After we complete reassignment for each 
> partition, we remove that partition and update the node in zookeeper. 
> {code}
> // remove this partition from that list
> val updatedPartitionsBeingReassigned = partitionsBeingReassigned - 
> topicAndPartition
> // write the new list to zookeeper
>   
> zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
> {code}
> This triggers the handler above which adds a new event in the queue. So what 
> you get is an n^2 increase in memory where n is the number of partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6081) response error code checking

2017-10-18 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209903#comment-16209903
 ] 

Onur Karaman commented on KAFKA-6081:
-

[~ijuma] There are some scenarios where we only want to begin certain actions 
after the previous action actually completes. The reassignment comment above is 
one example but I think there are others. Figuring out these scenarios is in 
the scope of this ticket.

> response error code checking
> 
>
> Key: KAFKA-6081
> URL: https://issues.apache.org/jira/browse/KAFKA-6081
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>
> In most cases in the controller, we assume that requests succeed. We should 
> instead check for their responses.
> Example: partition reassignment has the following todo:
> {code}
> // TODO: Eventually partition reassignment could use a callback that does 
> retries if deletion failed
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6082) consider fencing zookeeper updates with controller epoch zkVersion

2017-10-18 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-6082:
---

 Summary: consider fencing zookeeper updates with controller epoch 
zkVersion
 Key: KAFKA-6082
 URL: https://issues.apache.org/jira/browse/KAFKA-6082
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman


If we want, we can use multi-op to fence zookeeper updates with the controller 
epoch's zkVersion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5029) cleanup javadocs and logging

2017-10-18 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5029:

Description: Remove state change logger, splitting it up into the 
controller logs or broker logs.

> cleanup javadocs and logging
> 
>
> Key: KAFKA-5029
> URL: https://issues.apache.org/jira/browse/KAFKA-5029
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Remove state change logger, splitting it up into the controller logs or 
> broker logs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6081) response error code checking

2017-10-18 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-6081:
---

 Summary: response error code checking
 Key: KAFKA-6081
 URL: https://issues.apache.org/jira/browse/KAFKA-6081
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman


In most cases in the controller, we assume that requests succeed. We should 
instead check for their responses.

Example: partition reassignment has the following todo:
{code}
// TODO: Eventually partition reassignment could use a callback that does 
retries if deletion failed
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-1120) Controller could miss a broker state change

2017-10-18 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-1120:

Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-5027

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>  Labels: reliability
> Fix For: 1.1.0
>
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6029) Controller should wait for the leader migration to finish before ack a ControlledShutdownRequest

2017-10-18 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-6029:

Issue Type: Sub-task  (was: Improvement)
Parent: KAFKA-5027

> Controller should wait for the leader migration to finish before ack a 
> ControlledShutdownRequest
> 
>
> Key: KAFKA-6029
> URL: https://issues.apache.org/jira/browse/KAFKA-6029
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
> Fix For: 1.1.0
>
>
> In the controlled shutdown process, the controller will return the 
> ControlledShutdownResponse immediately after the state machine is updated. 
> Because the LeaderAndIsrRequests and UpdateMetadataRequests may not have been 
> successfully processed by the brokers, the leader migration and active ISR 
> shrink may not have done when the shutting down broker proceeds to shut down. 
> This will cause some of the leaders to take up to replica.lag.time.max.ms to 
> kick the broker out of ISR. Meanwhile the produce purgatory size will grow.
> Ideally, the controller should wait until all the LeaderAndIsrRequests and 
> UpdateMetadataRequests has been acked before sending back the 
> ControlledShutdownResponse.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5083) always leave the last surviving member of the ISR in ZK

2017-10-18 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5083.
-
Resolution: Fixed

This has been fixed in KAFKA-5642.

> always leave the last surviving member of the ISR in ZK
> ---
>
> Key: KAFKA-5083
> URL: https://issues.apache.org/jira/browse/KAFKA-5083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Currently we erase ISR membership if the replica to be removed from the ISR 
> is the last surviving member of the ISR and unclean leader election is 
> enabled for the corresponding topic.
> We should investigate leaving the last replica in ISR in ZK, independent of 
> whether unclean leader election is enabled or not. That way, if people 
> re-disabled unclean leader election, we can still try to elect the leader 
> from the last in-sync replica.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6065) Add zookeeper metrics to ZookeeperClient as in KIP-188

2017-10-16 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-6065:
---

 Summary: Add zookeeper metrics to ZookeeperClient as in KIP-188
 Key: KAFKA-6065
 URL: https://issues.apache.org/jira/browse/KAFKA-6065
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman


Among other things, KIP-188 added latency metrics to ZkUtils. We should add the 
same metrics to ZookeeperClient.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic

2017-10-04 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-6014:
---

 Summary: new consumer mirror maker halts after committing offsets 
to a deleted topic
 Key: KAFKA-6014
 URL: https://issues.apache.org/jira/browse/KAFKA-6014
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman


New consumer throws an unexpected KafkaException when trying to commit to a 
topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to catch 
the KafkaException and just kills the process. We didn't see this in the old 
consumer because old consumer just silently drops failed offset commits.

I ran a quick experiment locally to prove the behavior. The experiment:
1. start up a single broker
2. create a single-partition topic t
3. create a new consumer that consumes topic t
4. make the consumer commit every few seconds
5. delete topic t
6. expect: KafkaException that kills the process.

Here's my script:
{code}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.List;
import java.util.Properties;

public class OffsetCommitTopicDeletionTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9090");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer kafkaConsumer = new 
KafkaConsumer<>(props);
TopicPartition partition = new TopicPartition("t", 0);
List partitions = Collections.singletonList(partition);
kafkaConsumer.assign(partitions);
while (true) {
kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(0, "")));
Thread.sleep(1000);
}
}
}
{code}

Here are the other commands:
{code}
> rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> ./gradlew clean jar
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> --partitions 1 --replication-factor 1
> ./bin/kafka-run-class.sh 
> org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
{code}

Here is the output:
{code}
[2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
Offset commit failed on partition t-0 at offset 0: This server does not host 
this topic-partition. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
t-0 may not exist or user may not have Describe access to topic
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231)
  at 
org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22)

[jira] [Created] (KAFKA-5894) add the notion of max inflight requests to async ZookeeperClient

2017-09-14 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5894:
---

 Summary: add the notion of max inflight requests to async 
ZookeeperClient
 Key: KAFKA-5894
 URL: https://issues.apache.org/jira/browse/KAFKA-5894
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


ZookeeperClient is a zookeeper client that encourages pipelined requests to 
zookeeper. We want to add the notion of max inflight requests to the client for 
several reasons:
# to bound memory overhead associated with async requests on the client.
# to not overwhelm the zookeeper ensemble with a burst of requests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4747) add metrics for KafkaConsumer.poll

2017-09-11 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-4747.
-
Resolution: Won't Fix

[~junrao] pointed out that the distinction between tim-in-poll and 
time-in-application can be effectively computed as 1 - (io-ratio) - 
(io-wait-ratio). If this value is close to 1, then time is mostly being spent 
on the application-side. Otherwise if this value is close to 0, then time is 
mostly being spent on the client-side.

Here's a simple experiment I ran to verify:
{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
 * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
 * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
 * License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Properties;

public class SlowKafkaConsumer {
public static void main(String[] args) throws InterruptedException {
long pollTimeout = Long.valueOf(args[0]);
long sleepDuration = Long.valueOf(args[1]);
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9090");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "onur");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer kafkaConsumer = new 
KafkaConsumer<>(props);
kafkaConsumer.assign(Collections.singletonList(new TopicPartition("t", 
0)));
kafkaConsumer.seekToBeginning(Collections.singletonList(new 
TopicPartition("t", 0)));
while (true) {
kafkaConsumer.poll(pollTimeout);
Thread.sleep(sleepDuration);
}
}
}
{code}

{code}
no data
===
> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 0
io-ratio ~ 0
io-wait-ratio ~ 0.99

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 1
io-ratio ~ 0
io-wait-ratio ~ [0.1, 0.2]

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 2
io-ratio ~ 0
io-wait-ratio ~ [0.05, 0.12]

with data
=
> ./bin/kafka-producer-perf-test.sh --producer-props 
> bootstrap.servers=localhost:9090 --topic t --throughput -1 --num-records 
> 1 --record-size 1000

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 0
io-ratio ~ 0.06
io-wait-ratio ~ 0.8

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 1
io-ratio ~ 0
io-wait-ratio ~ [0.05, 0.1]

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 2
io-ratio ~ 0
io-wait-ratio ~ [0, 0.03]
{code}

> add metrics for KafkaConsumer.poll
> --
>
> Key: KAFKA-4747
> URL: https://issues.apache.org/jira/browse/KAFKA-4747
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have metrics 
> directly associated with it.
> We probably want to add two metrics:
> 1. time spent in KafkaConsumer.poll
> 2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5857) Excessive heap usage on controller node during reassignment

2017-09-08 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158933#comment-16158933
 ] 

Onur Karaman commented on KAFKA-5857:
-

I wouldn't be surprised if there were no attempts so far at making the 
controller memory-efficient.

There's a slight chance I may have coincidentally ran into the same issue 
yesterday while preparing for an upcoming talk. I tried timing how long it 
takes to complete a reassignment with many empty partitions and noticed that 
progress eventually halted and the controller hit OOM.

Here's my setup on my laptop:
{code}
> rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> ./gradlew clean jar
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> --partitions 5000 --replication-factor 1
> export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties
> python
import json
with open("reassignment.txt", "w") as f:
  reassignment = {"version":1, "partitions": [{"topic": "t", "partition": 
partition, "replicas": [0, 1]} for partition in range(5000)]}
  json.dump(reassignment, f, separators=(',',':'))
> ./zkCli.sh -server localhost:2181
> create /admin/reassign_partitions 
{code}

Note that I had to use the zkCli.sh that comes with zookeeper just to write the 
reassignment into zk. Kafka's kafka-reassign-partitions.sh gets stuck before 
writing to zookeeper and zookeeper-shell.sh seems to hang while copying the 
reassignment into the command.

Below are my broker configs:
{code}
> cat config/server0.properties
broker.id=0
listeners=PLAINTEXT://localhost:9090
log.dirs=/tmp/kafka-logs0
zookeeper.connect=127.0.0.1:2181
auto.leader.rebalance.enable=false
unclean.leader.election.enable=false
delete.topic.enable=true
log.index.size.max.bytes=1024
zookeeper.session.timeout.ms=6
replica.lag.time.max.ms=10
[09:57:16] okaraman@okaraman-mn3:~/code/kafka
> cat config/server1.properties
broker.id=1
listeners=PLAINTEXT://localhost:9091
log.dirs=/tmp/kafka-logs1
zookeeper.connect=localhost:2181
auto.leader.rebalance.enable=false
unclean.leader.election.enable=false
delete.topic.enable=true
log.index.size.max.bytes=1024
zookeeper.session.timeout.ms=6
replica.lag.time.max.ms=10
{code}

I haven't looked into the cause of the OOM. I ran the scenario again just now 
and found that the controller spent a significant amount of time in G1 Old Gen 
GC.

> Excessive heap usage on controller node during reassignment
> ---
>
> Key: KAFKA-5857
> URL: https://issues.apache.org/jira/browse/KAFKA-5857
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.11.0.0
> Environment: CentOs 7, Java 1.8
>Reporter: Raoufeh Hashemian
>  Labels: reliability
> Fix For: 1.1.0
>
> Attachments: CPU.png, disk_write_x.png, memory.png, 
> reassignment_plan.txt
>
>
> I was trying to expand our kafka cluster of 6 broker nodes to 12 broker 
> nodes. 
> Before expansion, we had a single topic with 960 partitions and a replication 
> factor of 3. So each node had 480 partitions. The size of data in each node 
> was 3TB . 
> To do the expansion, I submitted a partition reassignment plan (see attached 
> file for the current/new assignments). The plan was optimized to minimize 
> data movement and be rack aware. 
> When I submitted the plan, it took approximately 3 hours for moving data from 
> old to new nodes to complete. After that, it started deleting source 
> partitions (I say this based on the number of file descriptors) and 
> rebalancing leaders which has not been successful. Meanwhile, the heap usage 
> in the controller node started to go up with a large slope (along with long 
> GC times) and it took 5 hours for the controller to go out of memory and 
> another controller started to have the same behaviour for another 4 hours. At 
> this time the zookeeper ran out of disk and the service stopped.
> To recover from this condition:
> 1) Removed zk logs to free up disk and restarted all 3 zk nodes
> 2) Deleted /kafka/admin/reassign_partitions node from zk
> 3) Had to do unclean restarts of kafka service on oom controller nodes which 
> took 3 hours to complete  . After this stage there was still 676 under 
> replicated partitions.
> 4) Do a clean restart on all 12 broker nodes.
> After step 4 , number of under replicated nodes went to 0.
> So I was wondering if this memory footprint from controller is expected for 
> 1k partitions ? Did we do sth wrong or it is a bug?
> Attached are some resource usage graph during this 30 hours event and the 
> reassignment plan. I'll try to add log files as well



--
This message was sent by Atlassian JIRA

[jira] [Comment Edited] (KAFKA-5027) Kafka Controller Redesign

2017-09-06 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156278#comment-16156278
 ] 

Onur Karaman edited comment on KAFKA-5027 at 9/7/17 1:15 AM:
-

I think we'd want all the broker components communicating with zookeeper 
migrated to the new client, so that would be:
KAFKA-5642
KAFKA-5645
KAFKA-5646
KAFKA-5647

At the bare minimum, we'd have KAFKA-5642 and KAFKA-5646 checked in.


was (Author: onurkaraman):
I think we'd want all the broker components communicating with zookeeper 
migrated to the new client, so that would be:
KAFKA-5642
KAFKA-5645
KAFKA-5646
KAFKA-5647

> Kafka Controller Redesign
> -
>
> Key: KAFKA-5027
> URL: https://issues.apache.org/jira/browse/KAFKA-5027
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The goal of this redesign is to improve controller performance, controller 
> maintainability, and cluster reliability.
> Documentation regarding what's being considered can be found 
> [here|https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM].



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5027) Kafka Controller Redesign

2017-09-06 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156278#comment-16156278
 ] 

Onur Karaman commented on KAFKA-5027:
-

I think we'd want all the broker components communicating with zookeeper 
migrated to the new client, so that would be:
KAFKA-5642
KAFKA-5645
KAFKA-5646
KAFKA-5647

> Kafka Controller Redesign
> -
>
> Key: KAFKA-5027
> URL: https://issues.apache.org/jira/browse/KAFKA-5027
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The goal of this redesign is to improve controller performance, controller 
> maintainability, and cluster reliability.
> Documentation regarding what's being considered can be found 
> [here|https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM].



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-08-18 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133321#comment-16133321
 ] 

Onur Karaman commented on KAFKA-4893:
-

By the way, it seems that the 1.0.0 release is the perfect opportunity to make 
the change I proposed above.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5559) Metrics should throw if two client registers with same ID

2017-08-18 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133274#comment-16133274
 ] 

Onur Karaman commented on KAFKA-5559:
-

Hey [~guozhang]. I read through the PR. I actually had a very similar 
discussion over a year ago:
http://markmail.org/message/54ccqas7ty7t4mjt

Your comments on the uniqueness of client ids from 
https://github.com/apache/kafka/pull/3328#issuecomment-316137237 conflicts with 
Jay's comments from the discussion above.

If we take Jay's definition of client id "a logical name for an application 
which (potentially) spans more than one process", then a few of your comments 
seem to be incorrect:
# there would definitely be scenarios where multiple clients would exist with 
the same client id in the same JVM. This also suggests that KafkaStreams 
assigning unique client ids per client within the JVM is actually the wrong 
thing to do, and if I recall correctly, KafkaStreams did this purely as a 
workaround for the metrics collision issue.
# client ids are not meant to uniquely identify in the request logs which 
specific client instance sent the broker the request. It merely tells us which 
application sent the request.
# your comment above and the PR discussion suggests clients in the same jvm can 
have different AppInfos, which triggers the concern that one client's AppInfos 
would potentially replace the other. AppInfo is a fixed value within the JVM. I 
think the only way AppInfos can differ across clients of the same JVM is if you 
mess around with classloaders.

> Metrics should throw if two client registers with same ID
> -
>
> Key: KAFKA-5559
> URL: https://issues.apache.org/jira/browse/KAFKA-5559
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> Currently, {{AppInfoParser}} only logs a WARN message when a bean is 
> registered with an existing name. However, this should be treated as an error 
> and the exception should be rthrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5559) Metrics should throw if two client registers with same ID

2017-08-17 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131513#comment-16131513
 ] 

Onur Karaman commented on KAFKA-5559:
-

[~xiaotao183] I agree that as per the definition of {{client.id}}, it should be 
possible to have multiple clients on the same jvm with the same {{client.id}}.

I think the solution is the following:
{code}
-public static void registerAppInfo(String prefix, String id) {
+// method is synchronized to prevent race where two concurrent client 
instantiations would try to both register the mbean
+public static synchronized void registerAppInfo(String prefix, String id) {
 try {
 ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + 
id);
-AppInfo mBean = new AppInfo();
-ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, 
name);
+if 
(!ManagementFactory.getPlatformMBeanServer().isRegistered(name)) {
+AppInfo mBean = new AppInfo();
+
ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);
+}
 } catch (JMException e) {
 log.warn("Error registering AppInfo mbean", e);
 }
 }

-public static void unregisterAppInfo(String prefix, String id) {
+// method is synchronized to prevent race where two concurrent client 
closes would try to both unregister the mbean
+public static synchronized void unregisterAppInfo(String prefix, String 
id) {
{code}
Basically, just don't attempt to reregister if it already exists. I can open up 
a PR with the above fix.

> Metrics should throw if two client registers with same ID
> -
>
> Key: KAFKA-5559
> URL: https://issues.apache.org/jira/browse/KAFKA-5559
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> Currently, {{AppInfoParser}} only logs a WARN message when a bean is 
> registered with an existing name. However, this should be treated as an error 
> and the exception should be rthrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change

2017-08-04 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114911#comment-16114911
 ] 

Onur Karaman commented on KAFKA-1120:
-

[~wushujames] I think Jun's comments and the redesign doc in KAFKA-5027 are 
sort of saying the same thing. The broker-generation concept has two use cases 
which was sort of implied:
1. the controller using broker generations to distinguish events from a broker 
across generations.
2. controller-to-broker requests should include broker generation so that 
brokers can ignore requests that applied to its former generation.

While I think czxid's will work for the 1st use case, I don't think we can 
naively reuse czxid for the 2nd use case. The reason is a bit silly: 
zookeeper's CreateResponse only provides the path. It doesn't provide the 
created znode's Stat, So you have to do a later lookup to find out the znode's 
czxid.

If we want to solve both use cases with the same approach, I think we have a 
couple of options:
1. maybe we can get away with using czxids by doing a multi-op when registering 
brokers to transactionally create a znode and read that same znode to read the 
czxid of the znode it just created.
2. we can instead use the session id as the broker generation. The controller 
can infer the broker's generation by observing the broker znode's 
ephemeralOwner property. Brokers can determine their generation id by looking 
up the underlying zookeeper client's session id which is just 
ZooKeeper.getSessionId(). The ephemeralOwner of an ephemeral znode its the 
client's session id which is why this would work.

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>  Labels: reliability
> Fix For: 1.0.0
>
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5703) allow debug-level logging for RequestChannel's request logger

2017-08-04 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5703.
-
Resolution: Fixed

Woops. Looks like [~ijuma] already fixed this 3 days ago:
4086db472d08f6dee4d30dda82ab9ff7b67d1a20

> allow debug-level logging for RequestChannel's request logger
> -
>
> Key: KAFKA-5703
> URL: https://issues.apache.org/jira/browse/KAFKA-5703
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Git hash d25671884bbbdf7843ada3e7797573a00ac7cd56 introduced a bug in 
> RequestChannel's request logger that causes debug-level logging to never 
> occur.
> {code}
> -  if (requestLogger.isTraceEnabled)
> -requestLogger.trace("Completed request:%s from connection 
> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
> -  .format(requestDesc(true), connectionId, totalTime, 
> requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
> responseSendTime, securityProtocol, session.principal))
> -  else if (requestLogger.isDebugEnabled)
> -requestLogger.debug("Completed request:%s from connection 
> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
> -  .format(requestDesc(false), connectionId, totalTime, 
> requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
> responseSendTime, securityProtocol, session.principal))
> +  if (requestLogger.isDebugEnabled) {
> +val detailsEnabled = requestLogger.isTraceEnabled
> +requestLogger.trace("Completed request:%s from connection 
> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s"
> +  .format(requestDesc(detailsEnabled), connectionId, totalTime, 
> requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
> responseSendTime, securityProtocol, session.principal, listenerName.value))
> +  }
> {code}
> So trace-level logging is used even if debug-level logging is specified, 
> causing users to not see the non-detailed request logs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5703) allow debug-level logging for RequestChannel's request logger

2017-08-04 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5703:
---

 Summary: allow debug-level logging for RequestChannel's request 
logger
 Key: KAFKA-5703
 URL: https://issues.apache.org/jira/browse/KAFKA-5703
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


Git hash d25671884bbbdf7843ada3e7797573a00ac7cd56 introduced a bug in 
RequestChannel's request logger that causes debug-level logging to never occur.

{code}
-  if (requestLogger.isTraceEnabled)
-requestLogger.trace("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
-  .format(requestDesc(true), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal))
-  else if (requestLogger.isDebugEnabled)
-requestLogger.debug("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
-  .format(requestDesc(false), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal))
+  if (requestLogger.isDebugEnabled) {
+val detailsEnabled = requestLogger.isTraceEnabled
+requestLogger.trace("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s"
+  .format(requestDesc(detailsEnabled), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal, listenerName.value))
+  }
{code}

So trace-level logging is used even if debug-level logging is specified, 
causing users to not see the non-detailed request logs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change

2017-08-03 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113645#comment-16113645
 ] 

Onur Karaman commented on KAFKA-1120:
-

Alright I might know what's happening. Here's the red flag:
{code}
> grep -r "Newly added brokers" .
./kafka_2.11-0.11.0.0/logs/controller.log:[2017-08-03 13:40:09,121] INFO 
[Controller 1]: Newly added brokers: 1, deleted brokers: , all live brokers: 1 
(kafka.controller.KafkaController)
./kafka_2.11-0.11.0.0/logs/controller.log:[2017-08-03 13:40:27,172] INFO 
[Controller 1]: Newly added brokers: 2, deleted brokers: , all live brokers: 
1,2 (kafka.controller.KafkaController)
./kafka_2.11-0.11.0.0/logs/controller.log:[2017-08-03 13:47:15,215] INFO 
[Controller 1]: Newly added brokers: , deleted brokers: , all live brokers: 1,2 
(kafka.controller.KafkaController)
./kafka_2.11-0.11.0.0/logs/controller.log:[2017-08-03 13:47:17,927] INFO 
[Controller 1]: Newly added brokers: , deleted brokers: , all live brokers: 1,2 
(kafka.controller.KafkaController)
{code}

Here's the relevant code in BrokerChange.process:
{code}
val curBrokers = zkUtils.getAllBrokersInCluster().toSet
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
{code}

Basically the ControlledShutdown event took so long to process that the 
BrokerChange corresponding to the killed broker (3rd BrokerChange in the above 
snippet) and BrokerChange corresponding to the restarted broker (4th 
BrokerChange in the above snippet) are queued up waiting for 
ControlledShutdown's completion. By the time these BrokerChange events get 
processed, the restarted broker is already registered in zookeeper, causing the 
broker to appear in both controllerContext.liveOrShuttingDownBrokerIds and the 
brokers listed in zookeeper. This means the controller will not execute the 
onBrokerFailure in the 3rd BrokerChange and will also not execute onBrokerJoin 
in the 4th BrokerChange.

I'm not sure of the fix. Broker generations as defined in the redesign doc in 
KAFKA-5027 would work but I'm not sure if it's strictly required.

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>  Labels: reliability
> Fix For: 1.0.0
>
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change

2017-08-01 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110205#comment-16110205
 ] 

Onur Karaman commented on KAFKA-1120:
-

Thanks [~wushujames] that is perfect. I can reproduce the problem.

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>  Labels: reliability
> Fix For: 1.0.0
>
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101268#comment-16101268
 ] 

Onur Karaman commented on KAFKA-5501:
-

[~ijuma] I went ahead and reworded this ticket to be about making the client 
and KAFKA-5642 to be about using the client. With that, I went ahead and closed 
this ticket.

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5501.
-
Resolution: Fixed

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5501:

Description: Synchronous zookeeper apis means that we wait an entire round 
trip before doing the next operation. We should introduce a zookeeper client 
that encourages pipelined requests to zookeeper.  (was: Synchronous zookeeper 
writes means that we wait an entire round trip before doing the next write. 
These synchronous writes are happening at a per-partition granularity in 
several places, so partition-heavy clusters suffer from the controller doing 
many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.)

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5642) use async ZookeeperClient everywhere

2017-07-26 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5642:
---

 Summary: use async ZookeeperClient everywhere
 Key: KAFKA-5642
 URL: https://issues.apache.org/jira/browse/KAFKA-5642
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Synchronous zookeeper writes means that we wait an entire round trip before 
doing the next write. These synchronous writes are happening at a per-partition 
granularity in several places, so partition-heavy clusters suffer from the 
controller doing many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.

KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined 
requests to zookeeper. We should replace ZkClient's usage with this client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5501:

Summary: introduce async ZookeeperClient  (was: use async zookeeper apis 
everywhere)

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5586) Handle client disconnects during JoinGroup

2017-07-12 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16084990#comment-16084990
 ] 

Onur Karaman commented on KAFKA-5586:
-

I thought we made a conscious decision in the past to not do this in the 
discussion relating to KAFKA-2397. I had listed pros/cons of LeaveGroupRequest 
vs disconnect.

> Handle client disconnects during JoinGroup
> --
>
> Key: KAFKA-5586
> URL: https://issues.apache.org/jira/browse/KAFKA-5586
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> If a consumer disconnects with a JoinGroup in-flight, we do not remove it 
> from the group until after the Join phase completes. If the client 
> immediately re-sends the JoinGroup request and it already had a memberId, 
> then the callback will be replaced and there is no harm done. For the other 
> cases:
> 1. If the client disconnected due to a failure and does not re-send the 
> JoinGroup, the consumer will still be included in the new group generation 
> after the rebalance completes, but will immediately timeout and trigger a new 
> rebalance.
> 2. If the consumer was not a member of the group and re-sends JoinGroup, then 
> a new memberId will be created for that consumer and the old one will not be 
> removed. When the rebalance completes, the old memberId will timeout and a 
> rebalance will be triggered.
> To address these issues, we should add some additional logic to handle client 
> disconnections during the join phase. For newly generated memberIds, we 
> should simply remove them. For existing members, we should probably leave 
> them in the group and reset the heartbeat expiration task.
> Note that we currently have no facility to expose disconnects from the 
> network layer to the other layers, so we need to find a good approach for 
> this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5502) read current brokers from zookeeper upon processing broker change

2017-06-22 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5502:

Description: 
[~lindong]'s testing of the 0.11.0 release revealed a controller-side 
performance regression in clusters with many brokers and many partitions when 
bringing up many brokers simultaneously.

The regression is caused by KAFKA-5028: a Watcher receives WatchedEvent 
notifications from the raw ZooKeeper client EventThread. A WatchedEvent only 
contains the following information:
- KeeperState
- EventType
- path

Note that it does not actually contain the current data or current set of 
children associated with the data/child change notification. It is up to the 
user to do this lookup to see the current data or set of children.

ZkClient is itself a Watcher. When it receives a WatchedEvent, it puts a 
ZkEvent into its own queue which its own ZkEventThread processes. Users of 
ZkClient interact with these notifications through listeners (IZkDataListener, 
IZkChildListener). IZkDataListener actually expects as input the current data 
of the watched znode, and likewise IZkChildListener actually expects as input 
the current set of children of the watched znode. In order to provide this 
information to the listeners, the ZkEventThread, when processing the ZkEvent in 
its queue, looks up the information (either the current data or current set of 
children) simultaneously sets up the next watch, and passes the result to the 
listener.

The regression introduced in KAFKA-5028 is the time at which we lookup the 
information needed for the event processing.

In the past, the lookup from the ZkEventThread during ZkEvent processing would 
be passed into the listener which is processed immediately after. For instance 
in ZkClient.fireChildChangedEvents:
{code}
List children = getChildren(path);
listener.handleChildChange(path, children);
{code}
Now, however, there are multiple listeners that pass information looked up by 
the ZkEventThread into a ControllerEvent which gets processed potentially much 
later. For instance in BrokerChangeListener:
{code}
class BrokerChangeListener(controller: KafkaController) extends 
IZkChildListener with Logging {
  override def handleChildChange(parentPath: String, currentChilds: 
java.util.List[String]): Unit = {
import JavaConverters._

controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala))
  }
}
{code}

In terms of impact, this:
- increases the odds of working with stale information by the time the 
ControllerEvent gets processed.
- can cause the cluster to take a long time to stabilize if you bring up many 
brokers simultaneously.

In terms of how to solve it:
- (short term) just ignore the ZkClient's information lookup and repeat the 
lookup at the start of the ControllerEvent. This is the approach taken in this 
ticket.
- (long term) try to remove a queue. This basically means getting rid of 
ZkClient. This is likely the approach that will be taken in KAFKA-5501.

  was:
[~lindong]'s testing of the 0.11.0 release revealed a controller-side 
performance regression in clusters with many brokers and many partitions when 
bringing up many brokers simultaneously.

The regerssion is caused by KAFKA-5028: a Watcher receives WatchedEvent 
notifications from the raw ZooKeeper client EventThread. A WatchedEvent only 
contains the following information:
- KeeperState
- EventType
- path

Note that it does not actually contain the current data or current set of 
children associated with the data/child change notification. It is up to the 
user to do this lookup to see the current data or set of children.

ZkClient is itself a Watcher. When it receives a WatchedEvent, it puts a 
ZkEvent into its own queue which its own ZkEventThread processes. Users of 
ZkClient interact with these notifications through listeners (IZkDataListener, 
IZkChildListener). IZkDataListener actually expects as input the current data 
of the watched znode, and likewise IZkChildListener actually expects as input 
the current set of children of the watched znode. In order to provide this 
information to the listeners, the ZkEventThread, when processing the ZkEvent in 
its queue, looks up the information (either the current data or current set of 
children) simultaneously sets up the next watch, and passes the result to the 
listener.

The regression introduced in KAFKA-5028 is the time at which we lookup the 
information needed for the event processing.

In the past, the lookup from the ZkEventThread during ZkEvent processing would 
be passed into the listener which is processed immediately after. For instance 
in ZkClient.fireChildChangedEvents:
{code}
List children = getChildren(path);
listener.handleChildChange(path, children);
{code}
Now, however, there are multiple listeners that pass information looked up by 
the ZkEventThread into a ControllerEvent which gets processed 

[jira] [Created] (KAFKA-5502) read current brokers from zookeeper upon processing broker change

2017-06-22 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5502:
---

 Summary: read current brokers from zookeeper upon processing 
broker change
 Key: KAFKA-5502
 URL: https://issues.apache.org/jira/browse/KAFKA-5502
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


[~lindong]'s testing of the 0.11.0 release revealed a controller-side 
performance regression in clusters with many brokers and many partitions when 
bringing up many brokers simultaneously.

The regerssion is caused by KAFKA-5028: a Watcher receives WatchedEvent 
notifications from the raw ZooKeeper client EventThread. A WatchedEvent only 
contains the following information:
- KeeperState
- EventType
- path

Note that it does not actually contain the current data or current set of 
children associated with the data/child change notification. It is up to the 
user to do this lookup to see the current data or set of children.

ZkClient is itself a Watcher. When it receives a WatchedEvent, it puts a 
ZkEvent into its own queue which its own ZkEventThread processes. Users of 
ZkClient interact with these notifications through listeners (IZkDataListener, 
IZkChildListener). IZkDataListener actually expects as input the current data 
of the watched znode, and likewise IZkChildListener actually expects as input 
the current set of children of the watched znode. In order to provide this 
information to the listeners, the ZkEventThread, when processing the ZkEvent in 
its queue, looks up the information (either the current data or current set of 
children) simultaneously sets up the next watch, and passes the result to the 
listener.

The regression introduced in KAFKA-5028 is the time at which we lookup the 
information needed for the event processing.

In the past, the lookup from the ZkEventThread during ZkEvent processing would 
be passed into the listener which is processed immediately after. For instance 
in ZkClient.fireChildChangedEvents:
{code}
List children = getChildren(path);
listener.handleChildChange(path, children);
{code}
Now, however, there are multiple listeners that pass information looked up by 
the ZkEventThread into a ControllerEvent which gets processed potentially much 
later. For instance in BrokerChangeListener:
{code}
class BrokerChangeListener(controller: KafkaController) extends 
IZkChildListener with Logging {
  override def handleChildChange(parentPath: String, currentChilds: 
java.util.List[String]): Unit = {
import JavaConverters._

controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala))
  }
}
{code}

In terms of impact, this:
- increases the odds of working with stale information by the time the 
ControllerEvent gets processed.
- can cause the cluster to take a long time to stabilize if you bring up many 
brokers simultaneously.

In terms of how to solve it:
- (short term) just ignore the ZkClient's information lookup and repeat the 
lookup at the start of the ControllerEvent. This is the approach taken in this 
ticket.
- (long term) try to remove a queue. This basically means getting rid of 
ZkClient. This is likely the approach that will be taken in KAFKA-5501.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5501) use async zookeeper apis everywhere

2017-06-22 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5501:
---

 Summary: use async zookeeper apis everywhere
 Key: KAFKA-5501
 URL: https://issues.apache.org/jira/browse/KAFKA-5501
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Synchronous zookeeper writes means that we wait an entire round trip before 
doing the next write. These synchronous writes are happening at a per-partition 
granularity in several places, so partition-heavy clusters suffer from the 
controller doing many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)