[jira] [Updated] (KAFKA-6146) minimize the number of triggers enqueuing PreferredReplicaLeaderElection events
[ https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-6146: Summary: minimize the number of triggers enqueuing PreferredReplicaLeaderElection events (was: re-register the exist watch on PreferredReplicaElectionZNode only after the preferred leader election completes ) > minimize the number of triggers enqueuing PreferredReplicaLeaderElection > events > --- > > Key: KAFKA-6146 > URL: https://issues.apache.org/jira/browse/KAFKA-6146 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 1.1.0 >Reporter: Jun Rao > Fix For: 1.1.0 > > > We currently enqueue a PreferredReplicaLeaderElection controller event in > PreferredReplicaElectionHandler's handleCreation, handleDeletion, and > handleDataChange. We really only need to enqueue the event and re-register > the exist watch on PreferredReplicaElectionZNode after preferred replica > leader election completes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6146) re-register the exist watch on PreferredReplicaElectionZNode only after the preferred leader election completes
[ https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-6146: Summary: re-register the exist watch on PreferredReplicaElectionZNode only after the preferred leader election completes (was: re-register the exist watch on PreferredReplicaElectionZNode after the preferred leader election completes ) > re-register the exist watch on PreferredReplicaElectionZNode only after the > preferred leader election completes > > > Key: KAFKA-6146 > URL: https://issues.apache.org/jira/browse/KAFKA-6146 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 1.1.0 >Reporter: Jun Rao > Fix For: 1.1.0 > > > We currently enqueue a PreferredReplicaLeaderElection controller event in > PreferredReplicaElectionHandler's handleCreation, handleDeletion, and > handleDataChange. We really only need to enqueue the event and re-register > the exist watch on PreferredReplicaElectionZNode after preferred replica > leader election completes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-6146) minimize the number of triggers enqueuing PreferredReplicaLeaderElection events
[ https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman reassigned KAFKA-6146: --- Assignee: Onur Karaman > minimize the number of triggers enqueuing PreferredReplicaLeaderElection > events > --- > > Key: KAFKA-6146 > URL: https://issues.apache.org/jira/browse/KAFKA-6146 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 1.1.0 >Reporter: Jun Rao >Assignee: Onur Karaman > Fix For: 1.1.0 > > > We currently enqueue a PreferredReplicaLeaderElection controller event in > PreferredReplicaElectionHandler's handleCreation, handleDeletion, and > handleDataChange. We can just enqueue the event upon znode creation and after > preferred replica leader election completes. The processing of this latter > enqueue will register the exist watch on PreferredReplicaElectionZNode and > perform any pending preferred replica leader election that may have occurred > between completion and registration. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6146) minimize the number of triggers enqueuing PreferredReplicaLeaderElection events
[ https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-6146: Description: We currently enqueue a PreferredReplicaLeaderElection controller event in PreferredReplicaElectionHandler's handleCreation, handleDeletion, and handleDataChange. We can just enqueue the event upon znode creation and after preferred replica leader election completes. The processing of this latter enqueue will register the exist watch on PreferredReplicaElectionZNode and perform any pending preferred replica leader election that may have occurred between completion and registration. (was: We currently enqueue a PreferredReplicaLeaderElection controller event in PreferredReplicaElectionHandler's handleCreation, handleDeletion, and handleDataChange. We really only need to enqueue the event and re-register the exist watch on PreferredReplicaElectionZNode after preferred replica leader election completes.) > minimize the number of triggers enqueuing PreferredReplicaLeaderElection > events > --- > > Key: KAFKA-6146 > URL: https://issues.apache.org/jira/browse/KAFKA-6146 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 1.1.0 >Reporter: Jun Rao > Fix For: 1.1.0 > > > We currently enqueue a PreferredReplicaLeaderElection controller event in > PreferredReplicaElectionHandler's handleCreation, handleDeletion, and > handleDataChange. We can just enqueue the event upon znode creation and after > preferred replica leader election completes. The processing of this latter > enqueue will register the exist watch on PreferredReplicaElectionZNode and > perform any pending preferred replica leader election that may have occurred > between completion and registration. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6146) re-register the exist watch on PreferredReplicaElectionZNode after the preferred leader election completes
[ https://issues.apache.org/jira/browse/KAFKA-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-6146: Description: We currently enqueue a PreferredReplicaLeaderElection controller event in PreferredReplicaElectionHandler's handleCreation, handleDeletion, and handleDataChange. We really only need to enqueue the event and re-register the exist watch on PreferredReplicaElectionZNode after preferred replica leader election completes. (was: We currently enqueue a PreferredReplicaLeaderElection controller event in PreferredReplicaElectionHandler's handleCreation, handleDeletion, and handleDataChange. We really only need to enqueue the event and re-register the exist watch on after preferred replica leader election completes.) > re-register the exist watch on PreferredReplicaElectionZNode after the > preferred leader election completes > --- > > Key: KAFKA-6146 > URL: https://issues.apache.org/jira/browse/KAFKA-6146 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 1.1.0 >Reporter: Jun Rao > Fix For: 1.1.0 > > > We currently enqueue a PreferredReplicaLeaderElection controller event in > PreferredReplicaElectionHandler's handleCreation, handleDeletion, and > handleDataChange. We really only need to enqueue the event and re-register > the exist watch on PreferredReplicaElectionZNode after preferred replica > leader election completes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5894) add the notion of max inflight requests to async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16234885#comment-16234885 ] Onur Karaman commented on KAFKA-5894: - This went through [KIP-214|https://cwiki.apache.org/confluence/display/KAFKA/KIP-214%3A+Add+zookeeper.max.in.flight.requests+config+to+the+broker] and the KIP has been accepted. > add the notion of max inflight requests to async ZookeeperClient > > > Key: KAFKA-5894 > URL: https://issues.apache.org/jira/browse/KAFKA-5894 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman >Priority: Major > Fix For: 1.1.0 > > > ZookeeperClient is a zookeeper client that encourages pipelined requests to > zookeeper. We want to add the notion of max inflight requests to the client > for several reasons: > # to bound memory overhead associated with async requests on the client. > # to not overwhelm the zookeeper ensemble with a burst of requests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6134) High memory usage on controller during partition reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221443#comment-16221443 ] Onur Karaman edited comment on KAFKA-6134 at 10/26/17 11:28 PM: If you want to port a fix to 1.0 without pulling in all of KAFKA-5642, I think you can just lazily read the reassignment state upon actually processing the PartitionReassignment instead of providing one as part of the PartitionReassignment instance so that you'd only have one partition reassignment mapping allocated at any point in time. was (Author: onurkaraman): If you want to port a fix to 1.0 without pulling in all of KAFKA-5642, I think you can just lazily read the reassignment state upon actually processing the PartitionReassignment so that you'd only have one partition reassignment mapping allocated at any point in time. > High memory usage on controller during partition reassignment > - > > Key: KAFKA-6134 > URL: https://issues.apache.org/jira/browse/KAFKA-6134 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Jason Gustafson >Priority: Critical > Attachments: Screen Shot 2017-10-26 at 3.05.40 PM.png > > > We've had a couple users reporting spikes in memory usage when the controller > is performing partition reassignment in 0.11. After investigation, we found > that the controller event queue was using most of the retained memory. In > particular, we found several thousand {{PartitionReassignment}} objects, each > one containing one fewer partition than the previous one (see the attached > image). > From the code, it seems clear why this is happening. We have a watch on the > partition reassignment path which adds the {{PartitionReassignment}} object > to the event queue: > {code} > override def handleDataChange(dataPath: String, data: Any): Unit = { > val partitionReassignment = > ZkUtils.parsePartitionReassignmentData(data.toString) > eventManager.put(controller.PartitionReassignment(partitionReassignment)) > } > {code} > In the {{PartitionReassignment}} event handler, we iterate through all of the > partitions in the reassignment. After we complete reassignment for each > partition, we remove that partition and update the node in zookeeper. > {code} > // remove this partition from that list > val updatedPartitionsBeingReassigned = partitionsBeingReassigned - > topicAndPartition > // write the new list to zookeeper > > zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) > {code} > This triggers the handler above which adds a new event in the queue. So what > you get is an n^2 increase in memory where n is the number of partitions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6134) High memory usage on controller during partition reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221443#comment-16221443 ] Onur Karaman commented on KAFKA-6134: - If you want to port a fix to 1.0 without pulling in all of KAFKA-5642, I think you can just lazily read the reassignment state upon actually processing the PartitionReassignment so that you'd only have one partition reassignment mapping allocated at any point in time. > High memory usage on controller during partition reassignment > - > > Key: KAFKA-6134 > URL: https://issues.apache.org/jira/browse/KAFKA-6134 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Jason Gustafson >Priority: Critical > Attachments: Screen Shot 2017-10-26 at 3.05.40 PM.png > > > We've had a couple users reporting spikes in memory usage when the controller > is performing partition reassignment in 0.11. After investigation, we found > that the controller event queue was using most of the retained memory. In > particular, we found several thousand {{PartitionReassignment}} objects, each > one containing one fewer partition than the previous one (see the attached > image). > From the code, it seems clear why this is happening. We have a watch on the > partition reassignment path which adds the {{PartitionReassignment}} object > to the event queue: > {code} > override def handleDataChange(dataPath: String, data: Any): Unit = { > val partitionReassignment = > ZkUtils.parsePartitionReassignmentData(data.toString) > eventManager.put(controller.PartitionReassignment(partitionReassignment)) > } > {code} > In the {{PartitionReassignment}} event handler, we iterate through all of the > partitions in the reassignment. After we complete reassignment for each > partition, we remove that partition and update the node in zookeeper. > {code} > // remove this partition from that list > val updatedPartitionsBeingReassigned = partitionsBeingReassigned - > topicAndPartition > // write the new list to zookeeper > > zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) > {code} > This triggers the handler above which adds a new event in the queue. So what > you get is an n^2 increase in memory where n is the number of partitions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6134) High memory usage on controller during partition reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221392#comment-16221392 ] Onur Karaman commented on KAFKA-6134: - Yes I had noticed the O(N^2) behavior a while ago. I believe this should be mitigated after KAFKA-5642 since PartitionReassignment is now a case object instead of a case class containing the remaining reassignment mapping. > High memory usage on controller during partition reassignment > - > > Key: KAFKA-6134 > URL: https://issues.apache.org/jira/browse/KAFKA-6134 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Jason Gustafson >Priority: Critical > Attachments: Screen Shot 2017-10-26 at 3.05.40 PM.png > > > We've had a couple users reporting spikes in memory usage when the controller > is performing partition reassignment in 0.11. After investigation, we found > that the controller event queue was using most of the retained memory. In > particular, we found several thousand {{PartitionReassignment}} objects, each > one containing one fewer partition than the previous one (see the attached > image). > From the code, it seems clear why this is happening. We have a watch on the > partition reassignment path which adds the {{PartitionReassignment}} object > to the event queue: > {code} > override def handleDataChange(dataPath: String, data: Any): Unit = { > val partitionReassignment = > ZkUtils.parsePartitionReassignmentData(data.toString) > eventManager.put(controller.PartitionReassignment(partitionReassignment)) > } > {code} > In the {{PartitionReassignment}} event handler, we iterate through all of the > partitions in the reassignment. After we complete reassignment for each > partition, we remove that partition and update the node in zookeeper. > {code} > // remove this partition from that list > val updatedPartitionsBeingReassigned = partitionsBeingReassigned - > topicAndPartition > // write the new list to zookeeper > > zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) > {code} > This triggers the handler above which adds a new event in the queue. So what > you get is an n^2 increase in memory where n is the number of partitions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6081) response error code checking
[ https://issues.apache.org/jira/browse/KAFKA-6081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209903#comment-16209903 ] Onur Karaman commented on KAFKA-6081: - [~ijuma] There are some scenarios where we only want to begin certain actions after the previous action actually completes. The reassignment comment above is one example but I think there are others. Figuring out these scenarios is in the scope of this ticket. > response error code checking > > > Key: KAFKA-6081 > URL: https://issues.apache.org/jira/browse/KAFKA-6081 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman > > In most cases in the controller, we assume that requests succeed. We should > instead check for their responses. > Example: partition reassignment has the following todo: > {code} > // TODO: Eventually partition reassignment could use a callback that does > retries if deletion failed > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6082) consider fencing zookeeper updates with controller epoch zkVersion
Onur Karaman created KAFKA-6082: --- Summary: consider fencing zookeeper updates with controller epoch zkVersion Key: KAFKA-6082 URL: https://issues.apache.org/jira/browse/KAFKA-6082 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman If we want, we can use multi-op to fence zookeeper updates with the controller epoch's zkVersion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5029) cleanup javadocs and logging
[ https://issues.apache.org/jira/browse/KAFKA-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5029: Description: Remove state change logger, splitting it up into the controller logs or broker logs. > cleanup javadocs and logging > > > Key: KAFKA-5029 > URL: https://issues.apache.org/jira/browse/KAFKA-5029 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Remove state change logger, splitting it up into the controller logs or > broker logs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6081) response error code checking
Onur Karaman created KAFKA-6081: --- Summary: response error code checking Key: KAFKA-6081 URL: https://issues.apache.org/jira/browse/KAFKA-6081 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman In most cases in the controller, we assume that requests succeed. We should instead check for their responses. Example: partition reassignment has the following todo: {code} // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-1120) Controller could miss a broker state change
[ https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1120: Issue Type: Sub-task (was: Bug) Parent: KAFKA-5027 > Controller could miss a broker state change > > > Key: KAFKA-1120 > URL: https://issues.apache.org/jira/browse/KAFKA-1120 > Project: Kafka > Issue Type: Sub-task > Components: core >Affects Versions: 0.8.1 >Reporter: Jun Rao > Labels: reliability > Fix For: 1.1.0 > > > When the controller is in the middle of processing a task (e.g., preferred > leader election, broker change), it holds a controller lock. During this > time, a broker could have de-registered and re-registered itself in ZK. After > the controller finishes processing the current task, it will start processing > the logic in the broker change listener. However, it will see no broker > change and therefore won't do anything to the restarted broker. This broker > will be in a weird state since the controller doesn't inform it to become the > leader of any partition. Yet, the cached metadata in other brokers could > still list that broker as the leader for some partitions. Client requests > routed to that broker will then get a TopicOrPartitionNotExistException. This > broker will continue to be in this bad state until it's restarted again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6029) Controller should wait for the leader migration to finish before ack a ControlledShutdownRequest
[ https://issues.apache.org/jira/browse/KAFKA-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-6029: Issue Type: Sub-task (was: Improvement) Parent: KAFKA-5027 > Controller should wait for the leader migration to finish before ack a > ControlledShutdownRequest > > > Key: KAFKA-6029 > URL: https://issues.apache.org/jira/browse/KAFKA-6029 > Project: Kafka > Issue Type: Sub-task > Components: controller, core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin > Fix For: 1.1.0 > > > In the controlled shutdown process, the controller will return the > ControlledShutdownResponse immediately after the state machine is updated. > Because the LeaderAndIsrRequests and UpdateMetadataRequests may not have been > successfully processed by the brokers, the leader migration and active ISR > shrink may not have done when the shutting down broker proceeds to shut down. > This will cause some of the leaders to take up to replica.lag.time.max.ms to > kick the broker out of ISR. Meanwhile the produce purgatory size will grow. > Ideally, the controller should wait until all the LeaderAndIsrRequests and > UpdateMetadataRequests has been acked before sending back the > ControlledShutdownResponse. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5083) always leave the last surviving member of the ISR in ZK
[ https://issues.apache.org/jira/browse/KAFKA-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-5083. - Resolution: Fixed This has been fixed in KAFKA-5642. > always leave the last surviving member of the ISR in ZK > --- > > Key: KAFKA-5083 > URL: https://issues.apache.org/jira/browse/KAFKA-5083 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Currently we erase ISR membership if the replica to be removed from the ISR > is the last surviving member of the ISR and unclean leader election is > enabled for the corresponding topic. > We should investigate leaving the last replica in ISR in ZK, independent of > whether unclean leader election is enabled or not. That way, if people > re-disabled unclean leader election, we can still try to elect the leader > from the last in-sync replica. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6065) Add zookeeper metrics to ZookeeperClient as in KIP-188
Onur Karaman created KAFKA-6065: --- Summary: Add zookeeper metrics to ZookeeperClient as in KIP-188 Key: KAFKA-6065 URL: https://issues.apache.org/jira/browse/KAFKA-6065 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Among other things, KIP-188 added latency metrics to ZkUtils. We should add the same metrics to ZookeeperClient. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic
Onur Karaman created KAFKA-6014: --- Summary: new consumer mirror maker halts after committing offsets to a deleted topic Key: KAFKA-6014 URL: https://issues.apache.org/jira/browse/KAFKA-6014 Project: Kafka Issue Type: Bug Reporter: Onur Karaman New consumer throws an unexpected KafkaException when trying to commit to a topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to catch the KafkaException and just kills the process. We didn't see this in the old consumer because old consumer just silently drops failed offset commits. I ran a quick experiment locally to prove the behavior. The experiment: 1. start up a single broker 2. create a single-partition topic t 3. create a new consumer that consumes topic t 4. make the consumer commit every few seconds 5. delete topic t 6. expect: KafkaException that kills the process. Here's my script: {code} package org.apache.kafka.clients.consumer; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.List; import java.util.Properties; public class OffsetCommitTopicDeletionTest { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9090"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumerkafkaConsumer = new KafkaConsumer<>(props); TopicPartition partition = new TopicPartition("t", 0); List partitions = Collections.singletonList(partition); kafkaConsumer.assign(partitions); while (true) { kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(0, ""))); Thread.sleep(1000); } } } {code} Here are the other commands: {code} > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > ./gradlew clean jar > ./bin/zookeeper-server-start.sh config/zookeeper.properties > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > --partitions 1 --replication-factor 1 > ./bin/kafka-run-class.sh > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t {code} Here is the output: {code} [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] Offset commit failed on partition t-0 at offset 0: This server does not host this topic-partition. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) Exception in thread "main" org.apache.kafka.common.KafkaException: Partition t-0 may not exist or user may not have Describe access to topic at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231) at org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22)
[jira] [Created] (KAFKA-5894) add the notion of max inflight requests to async ZookeeperClient
Onur Karaman created KAFKA-5894: --- Summary: add the notion of max inflight requests to async ZookeeperClient Key: KAFKA-5894 URL: https://issues.apache.org/jira/browse/KAFKA-5894 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper. We want to add the notion of max inflight requests to the client for several reasons: # to bound memory overhead associated with async requests on the client. # to not overwhelm the zookeeper ensemble with a burst of requests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4747) add metrics for KafkaConsumer.poll
[ https://issues.apache.org/jira/browse/KAFKA-4747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-4747. - Resolution: Won't Fix [~junrao] pointed out that the distinction between tim-in-poll and time-in-application can be effectively computed as 1 - (io-ratio) - (io-wait-ratio). If this value is close to 1, then time is mostly being spent on the application-side. Otherwise if this value is close to 0, then time is mostly being spent on the client-side. Here's a simple experiment I ran to verify: {code} /** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.consumer; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.Properties; public class SlowKafkaConsumer { public static void main(String[] args) throws InterruptedException { long pollTimeout = Long.valueOf(args[0]); long sleepDuration = Long.valueOf(args[1]); Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9090"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "onur"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); KafkaConsumerkafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.assign(Collections.singletonList(new TopicPartition("t", 0))); kafkaConsumer.seekToBeginning(Collections.singletonList(new TopicPartition("t", 0))); while (true) { kafkaConsumer.poll(pollTimeout); Thread.sleep(sleepDuration); } } } {code} {code} no data === > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 0 io-ratio ~ 0 io-wait-ratio ~ 0.99 > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 1 io-ratio ~ 0 io-wait-ratio ~ [0.1, 0.2] > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 2 io-ratio ~ 0 io-wait-ratio ~ [0.05, 0.12] with data = > ./bin/kafka-producer-perf-test.sh --producer-props > bootstrap.servers=localhost:9090 --topic t --throughput -1 --num-records > 1 --record-size 1000 > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 0 io-ratio ~ 0.06 io-wait-ratio ~ 0.8 > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 1 io-ratio ~ 0 io-wait-ratio ~ [0.05, 0.1] > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 2 io-ratio ~ 0 io-wait-ratio ~ [0, 0.03] {code} > add metrics for KafkaConsumer.poll > -- > > Key: KAFKA-4747 > URL: https://issues.apache.org/jira/browse/KAFKA-4747 > Project: Kafka > Issue Type: Improvement >Reporter: Onur Karaman >Assignee: Onur Karaman > > KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have metrics > directly associated with it. > We probably want to add two metrics: > 1. time spent in KafkaConsumer.poll > 2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5857) Excessive heap usage on controller node during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-5857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158933#comment-16158933 ] Onur Karaman commented on KAFKA-5857: - I wouldn't be surprised if there were no attempts so far at making the controller memory-efficient. There's a slight chance I may have coincidentally ran into the same issue yesterday while preparing for an upcoming talk. I tried timing how long it takes to complete a reassignment with many empty partitions and noticed that progress eventually halted and the controller hit OOM. Here's my setup on my laptop: {code} > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > ./gradlew clean jar > ./bin/zookeeper-server-start.sh config/zookeeper.properties > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > --partitions 5000 --replication-factor 1 > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties > python import json with open("reassignment.txt", "w") as f: reassignment = {"version":1, "partitions": [{"topic": "t", "partition": partition, "replicas": [0, 1]} for partition in range(5000)]} json.dump(reassignment, f, separators=(',',':')) > ./zkCli.sh -server localhost:2181 > create /admin/reassign_partitions {code} Note that I had to use the zkCli.sh that comes with zookeeper just to write the reassignment into zk. Kafka's kafka-reassign-partitions.sh gets stuck before writing to zookeeper and zookeeper-shell.sh seems to hang while copying the reassignment into the command. Below are my broker configs: {code} > cat config/server0.properties broker.id=0 listeners=PLAINTEXT://localhost:9090 log.dirs=/tmp/kafka-logs0 zookeeper.connect=127.0.0.1:2181 auto.leader.rebalance.enable=false unclean.leader.election.enable=false delete.topic.enable=true log.index.size.max.bytes=1024 zookeeper.session.timeout.ms=6 replica.lag.time.max.ms=10 [09:57:16] okaraman@okaraman-mn3:~/code/kafka > cat config/server1.properties broker.id=1 listeners=PLAINTEXT://localhost:9091 log.dirs=/tmp/kafka-logs1 zookeeper.connect=localhost:2181 auto.leader.rebalance.enable=false unclean.leader.election.enable=false delete.topic.enable=true log.index.size.max.bytes=1024 zookeeper.session.timeout.ms=6 replica.lag.time.max.ms=10 {code} I haven't looked into the cause of the OOM. I ran the scenario again just now and found that the controller spent a significant amount of time in G1 Old Gen GC. > Excessive heap usage on controller node during reassignment > --- > > Key: KAFKA-5857 > URL: https://issues.apache.org/jira/browse/KAFKA-5857 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.11.0.0 > Environment: CentOs 7, Java 1.8 >Reporter: Raoufeh Hashemian > Labels: reliability > Fix For: 1.1.0 > > Attachments: CPU.png, disk_write_x.png, memory.png, > reassignment_plan.txt > > > I was trying to expand our kafka cluster of 6 broker nodes to 12 broker > nodes. > Before expansion, we had a single topic with 960 partitions and a replication > factor of 3. So each node had 480 partitions. The size of data in each node > was 3TB . > To do the expansion, I submitted a partition reassignment plan (see attached > file for the current/new assignments). The plan was optimized to minimize > data movement and be rack aware. > When I submitted the plan, it took approximately 3 hours for moving data from > old to new nodes to complete. After that, it started deleting source > partitions (I say this based on the number of file descriptors) and > rebalancing leaders which has not been successful. Meanwhile, the heap usage > in the controller node started to go up with a large slope (along with long > GC times) and it took 5 hours for the controller to go out of memory and > another controller started to have the same behaviour for another 4 hours. At > this time the zookeeper ran out of disk and the service stopped. > To recover from this condition: > 1) Removed zk logs to free up disk and restarted all 3 zk nodes > 2) Deleted /kafka/admin/reassign_partitions node from zk > 3) Had to do unclean restarts of kafka service on oom controller nodes which > took 3 hours to complete . After this stage there was still 676 under > replicated partitions. > 4) Do a clean restart on all 12 broker nodes. > After step 4 , number of under replicated nodes went to 0. > So I was wondering if this memory footprint from controller is expected for > 1k partitions ? Did we do sth wrong or it is a bug? > Attached are some resource usage graph during this 30 hours event and the > reassignment plan. I'll try to add log files as well -- This message was sent by Atlassian JIRA
[jira] [Comment Edited] (KAFKA-5027) Kafka Controller Redesign
[ https://issues.apache.org/jira/browse/KAFKA-5027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156278#comment-16156278 ] Onur Karaman edited comment on KAFKA-5027 at 9/7/17 1:15 AM: - I think we'd want all the broker components communicating with zookeeper migrated to the new client, so that would be: KAFKA-5642 KAFKA-5645 KAFKA-5646 KAFKA-5647 At the bare minimum, we'd have KAFKA-5642 and KAFKA-5646 checked in. was (Author: onurkaraman): I think we'd want all the broker components communicating with zookeeper migrated to the new client, so that would be: KAFKA-5642 KAFKA-5645 KAFKA-5646 KAFKA-5647 > Kafka Controller Redesign > - > > Key: KAFKA-5027 > URL: https://issues.apache.org/jira/browse/KAFKA-5027 > Project: Kafka > Issue Type: Improvement >Reporter: Onur Karaman >Assignee: Onur Karaman > > The goal of this redesign is to improve controller performance, controller > maintainability, and cluster reliability. > Documentation regarding what's being considered can be found > [here|https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM]. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5027) Kafka Controller Redesign
[ https://issues.apache.org/jira/browse/KAFKA-5027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156278#comment-16156278 ] Onur Karaman commented on KAFKA-5027: - I think we'd want all the broker components communicating with zookeeper migrated to the new client, so that would be: KAFKA-5642 KAFKA-5645 KAFKA-5646 KAFKA-5647 > Kafka Controller Redesign > - > > Key: KAFKA-5027 > URL: https://issues.apache.org/jira/browse/KAFKA-5027 > Project: Kafka > Issue Type: Improvement >Reporter: Onur Karaman >Assignee: Onur Karaman > > The goal of this redesign is to improve controller performance, controller > maintainability, and cluster reliability. > Documentation regarding what's being considered can be found > [here|https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM]. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length
[ https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133321#comment-16133321 ] Onur Karaman commented on KAFKA-4893: - By the way, it seems that the 1.0.0 release is the perfect opportunity to make the change I proposed above. > async topic deletion conflicts with max topic length > > > Key: KAFKA-4893 > URL: https://issues.apache.org/jira/browse/KAFKA-4893 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Vahid Hashemian >Priority: Minor > > As per the > [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], > topics can be only 249 characters long to line up with typical filesystem > limitations: > {quote} > Each sharded partition log is placed into its own folder under the Kafka log > directory. The name of such folders consists of the topic name, appended by a > dash (\-) and the partition id. Since a typical folder name can not be over > 255 characters long, there will be a limitation on the length of topic names. > We assume the number of partitions will not ever be above 100,000. Therefore, > topic names cannot be longer than 249 characters. This leaves just enough > room in the folder name for a dash and a potentially 5 digit long partition > id. > {quote} > {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during > validation. > This limit ends up not being quite right since topic deletion ends up > renaming the directory to the form {{topic-partition.uniqueId-delete}} as can > be seen in {{LogManager.asyncDelete}}: > {code} > val dirName = new StringBuilder(removedLog.name) > .append(".") > > .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) > .append(Log.DeleteDirSuffix) > .toString() > {code} > So the unique id and "-delete" suffix end up hogging some of the characters. > Deleting a long-named topic results in a log message such as the following: > {code} > kafka.common.KafkaStorageException: Failed to rename log directory from > /tmp/kafka-logs0/0-0 > to > /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete > at kafka.log.LogManager.asyncDelete(LogManager.scala:439) > at > kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) > at kafka.cluster.Partition.delete(Partition.scala:137) > at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259) > at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174) > at kafka.server.KafkaApis.handle(KafkaApis.scala:86) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64) > at java.lang.Thread.run(Thread.java:745) > {code} > The topic after this point still exists but has Leader set to -1 and the > controller recognizes the topic completion as incomplete (the topic znode is > still in /admin/delete_topics). > I don't believe linkedin has any topic name this long but I'm making the > ticket in case anyone runs into this problem. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5559) Metrics should throw if two client registers with same ID
[ https://issues.apache.org/jira/browse/KAFKA-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133274#comment-16133274 ] Onur Karaman commented on KAFKA-5559: - Hey [~guozhang]. I read through the PR. I actually had a very similar discussion over a year ago: http://markmail.org/message/54ccqas7ty7t4mjt Your comments on the uniqueness of client ids from https://github.com/apache/kafka/pull/3328#issuecomment-316137237 conflicts with Jay's comments from the discussion above. If we take Jay's definition of client id "a logical name for an application which (potentially) spans more than one process", then a few of your comments seem to be incorrect: # there would definitely be scenarios where multiple clients would exist with the same client id in the same JVM. This also suggests that KafkaStreams assigning unique client ids per client within the JVM is actually the wrong thing to do, and if I recall correctly, KafkaStreams did this purely as a workaround for the metrics collision issue. # client ids are not meant to uniquely identify in the request logs which specific client instance sent the broker the request. It merely tells us which application sent the request. # your comment above and the PR discussion suggests clients in the same jvm can have different AppInfos, which triggers the concern that one client's AppInfos would potentially replace the other. AppInfo is a fixed value within the JVM. I think the only way AppInfos can differ across clients of the same JVM is if you mess around with classloaders. > Metrics should throw if two client registers with same ID > - > > Key: KAFKA-5559 > URL: https://issues.apache.org/jira/browse/KAFKA-5559 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > Currently, {{AppInfoParser}} only logs a WARN message when a bean is > registered with an existing name. However, this should be treated as an error > and the exception should be rthrown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5559) Metrics should throw if two client registers with same ID
[ https://issues.apache.org/jira/browse/KAFKA-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131513#comment-16131513 ] Onur Karaman commented on KAFKA-5559: - [~xiaotao183] I agree that as per the definition of {{client.id}}, it should be possible to have multiple clients on the same jvm with the same {{client.id}}. I think the solution is the following: {code} -public static void registerAppInfo(String prefix, String id) { +// method is synchronized to prevent race where two concurrent client instantiations would try to both register the mbean +public static synchronized void registerAppInfo(String prefix, String id) { try { ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id); -AppInfo mBean = new AppInfo(); -ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name); +if (!ManagementFactory.getPlatformMBeanServer().isRegistered(name)) { +AppInfo mBean = new AppInfo(); + ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name); +} } catch (JMException e) { log.warn("Error registering AppInfo mbean", e); } } -public static void unregisterAppInfo(String prefix, String id) { +// method is synchronized to prevent race where two concurrent client closes would try to both unregister the mbean +public static synchronized void unregisterAppInfo(String prefix, String id) { {code} Basically, just don't attempt to reregister if it already exists. I can open up a PR with the above fix. > Metrics should throw if two client registers with same ID > - > > Key: KAFKA-5559 > URL: https://issues.apache.org/jira/browse/KAFKA-5559 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > Currently, {{AppInfoParser}} only logs a WARN message when a bean is > registered with an existing name. However, this should be treated as an error > and the exception should be rthrown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change
[ https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114911#comment-16114911 ] Onur Karaman commented on KAFKA-1120: - [~wushujames] I think Jun's comments and the redesign doc in KAFKA-5027 are sort of saying the same thing. The broker-generation concept has two use cases which was sort of implied: 1. the controller using broker generations to distinguish events from a broker across generations. 2. controller-to-broker requests should include broker generation so that brokers can ignore requests that applied to its former generation. While I think czxid's will work for the 1st use case, I don't think we can naively reuse czxid for the 2nd use case. The reason is a bit silly: zookeeper's CreateResponse only provides the path. It doesn't provide the created znode's Stat, So you have to do a later lookup to find out the znode's czxid. If we want to solve both use cases with the same approach, I think we have a couple of options: 1. maybe we can get away with using czxids by doing a multi-op when registering brokers to transactionally create a znode and read that same znode to read the czxid of the znode it just created. 2. we can instead use the session id as the broker generation. The controller can infer the broker's generation by observing the broker znode's ephemeralOwner property. Brokers can determine their generation id by looking up the underlying zookeeper client's session id which is just ZooKeeper.getSessionId(). The ephemeralOwner of an ephemeral znode its the client's session id which is why this would work. > Controller could miss a broker state change > > > Key: KAFKA-1120 > URL: https://issues.apache.org/jira/browse/KAFKA-1120 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.1 >Reporter: Jun Rao > Labels: reliability > Fix For: 1.0.0 > > > When the controller is in the middle of processing a task (e.g., preferred > leader election, broker change), it holds a controller lock. During this > time, a broker could have de-registered and re-registered itself in ZK. After > the controller finishes processing the current task, it will start processing > the logic in the broker change listener. However, it will see no broker > change and therefore won't do anything to the restarted broker. This broker > will be in a weird state since the controller doesn't inform it to become the > leader of any partition. Yet, the cached metadata in other brokers could > still list that broker as the leader for some partitions. Client requests > routed to that broker will then get a TopicOrPartitionNotExistException. This > broker will continue to be in this bad state until it's restarted again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5703) allow debug-level logging for RequestChannel's request logger
[ https://issues.apache.org/jira/browse/KAFKA-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-5703. - Resolution: Fixed Woops. Looks like [~ijuma] already fixed this 3 days ago: 4086db472d08f6dee4d30dda82ab9ff7b67d1a20 > allow debug-level logging for RequestChannel's request logger > - > > Key: KAFKA-5703 > URL: https://issues.apache.org/jira/browse/KAFKA-5703 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman > > Git hash d25671884bbbdf7843ada3e7797573a00ac7cd56 introduced a bug in > RequestChannel's request logger that causes debug-level logging to never > occur. > {code} > - if (requestLogger.isTraceEnabled) > -requestLogger.trace("Completed request:%s from connection > %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" > - .format(requestDesc(true), connectionId, totalTime, > requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, > responseSendTime, securityProtocol, session.principal)) > - else if (requestLogger.isDebugEnabled) > -requestLogger.debug("Completed request:%s from connection > %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" > - .format(requestDesc(false), connectionId, totalTime, > requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, > responseSendTime, securityProtocol, session.principal)) > + if (requestLogger.isDebugEnabled) { > +val detailsEnabled = requestLogger.isTraceEnabled > +requestLogger.trace("Completed request:%s from connection > %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s" > + .format(requestDesc(detailsEnabled), connectionId, totalTime, > requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, > responseSendTime, securityProtocol, session.principal, listenerName.value)) > + } > {code} > So trace-level logging is used even if debug-level logging is specified, > causing users to not see the non-detailed request logs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5703) allow debug-level logging for RequestChannel's request logger
Onur Karaman created KAFKA-5703: --- Summary: allow debug-level logging for RequestChannel's request logger Key: KAFKA-5703 URL: https://issues.apache.org/jira/browse/KAFKA-5703 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Onur Karaman Git hash d25671884bbbdf7843ada3e7797573a00ac7cd56 introduced a bug in RequestChannel's request logger that causes debug-level logging to never occur. {code} - if (requestLogger.isTraceEnabled) -requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" - .format(requestDesc(true), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal)) - else if (requestLogger.isDebugEnabled) -requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" - .format(requestDesc(false), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal)) + if (requestLogger.isDebugEnabled) { +val detailsEnabled = requestLogger.isTraceEnabled +requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s" + .format(requestDesc(detailsEnabled), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal, listenerName.value)) + } {code} So trace-level logging is used even if debug-level logging is specified, causing users to not see the non-detailed request logs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change
[ https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113645#comment-16113645 ] Onur Karaman commented on KAFKA-1120: - Alright I might know what's happening. Here's the red flag: {code} > grep -r "Newly added brokers" . ./kafka_2.11-0.11.0.0/logs/controller.log:[2017-08-03 13:40:09,121] INFO [Controller 1]: Newly added brokers: 1, deleted brokers: , all live brokers: 1 (kafka.controller.KafkaController) ./kafka_2.11-0.11.0.0/logs/controller.log:[2017-08-03 13:40:27,172] INFO [Controller 1]: Newly added brokers: 2, deleted brokers: , all live brokers: 1,2 (kafka.controller.KafkaController) ./kafka_2.11-0.11.0.0/logs/controller.log:[2017-08-03 13:47:15,215] INFO [Controller 1]: Newly added brokers: , deleted brokers: , all live brokers: 1,2 (kafka.controller.KafkaController) ./kafka_2.11-0.11.0.0/logs/controller.log:[2017-08-03 13:47:17,927] INFO [Controller 1]: Newly added brokers: , deleted brokers: , all live brokers: 1,2 (kafka.controller.KafkaController) {code} Here's the relevant code in BrokerChange.process: {code} val curBrokers = zkUtils.getAllBrokersInCluster().toSet val curBrokerIds = curBrokers.map(_.id) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds {code} Basically the ControlledShutdown event took so long to process that the BrokerChange corresponding to the killed broker (3rd BrokerChange in the above snippet) and BrokerChange corresponding to the restarted broker (4th BrokerChange in the above snippet) are queued up waiting for ControlledShutdown's completion. By the time these BrokerChange events get processed, the restarted broker is already registered in zookeeper, causing the broker to appear in both controllerContext.liveOrShuttingDownBrokerIds and the brokers listed in zookeeper. This means the controller will not execute the onBrokerFailure in the 3rd BrokerChange and will also not execute onBrokerJoin in the 4th BrokerChange. I'm not sure of the fix. Broker generations as defined in the redesign doc in KAFKA-5027 would work but I'm not sure if it's strictly required. > Controller could miss a broker state change > > > Key: KAFKA-1120 > URL: https://issues.apache.org/jira/browse/KAFKA-1120 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.1 >Reporter: Jun Rao > Labels: reliability > Fix For: 1.0.0 > > > When the controller is in the middle of processing a task (e.g., preferred > leader election, broker change), it holds a controller lock. During this > time, a broker could have de-registered and re-registered itself in ZK. After > the controller finishes processing the current task, it will start processing > the logic in the broker change listener. However, it will see no broker > change and therefore won't do anything to the restarted broker. This broker > will be in a weird state since the controller doesn't inform it to become the > leader of any partition. Yet, the cached metadata in other brokers could > still list that broker as the leader for some partitions. Client requests > routed to that broker will then get a TopicOrPartitionNotExistException. This > broker will continue to be in this bad state until it's restarted again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change
[ https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110205#comment-16110205 ] Onur Karaman commented on KAFKA-1120: - Thanks [~wushujames] that is perfect. I can reproduce the problem. > Controller could miss a broker state change > > > Key: KAFKA-1120 > URL: https://issues.apache.org/jira/browse/KAFKA-1120 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.1 >Reporter: Jun Rao > Labels: reliability > Fix For: 1.0.0 > > > When the controller is in the middle of processing a task (e.g., preferred > leader election, broker change), it holds a controller lock. During this > time, a broker could have de-registered and re-registered itself in ZK. After > the controller finishes processing the current task, it will start processing > the logic in the broker change listener. However, it will see no broker > change and therefore won't do anything to the restarted broker. This broker > will be in a weird state since the controller doesn't inform it to become the > leader of any partition. Yet, the cached metadata in other brokers could > still list that broker as the leader for some partitions. Client requests > routed to that broker will then get a TopicOrPartitionNotExistException. This > broker will continue to be in this bad state until it's restarted again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101268#comment-16101268 ] Onur Karaman commented on KAFKA-5501: - [~ijuma] I went ahead and reworded this ticket to be about making the client and KAFKA-5642 to be about using the client. With that, I went ahead and closed this ticket. > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper apis means that we wait an entire round trip before > doing the next operation. We should introduce a zookeeper client that > encourages pipelined requests to zookeeper. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-5501. - Resolution: Fixed > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper apis means that we wait an entire round trip before > doing the next operation. We should introduce a zookeeper client that > encourages pipelined requests to zookeeper. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5501: Description: Synchronous zookeeper apis means that we wait an entire round trip before doing the next operation. We should introduce a zookeeper client that encourages pipelined requests to zookeeper. (was: Synchronous zookeeper writes means that we wait an entire round trip before doing the next write. These synchronous writes are happening at a per-partition granularity in several places, so partition-heavy clusters suffer from the controller doing many sequential round trips to zookeeper. * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in zookeeper on transition to OnlinePartition. This gets triggered per-partition sequentially with synchronous writes during controlled shutdown of the shutting down broker's replicas for which it is the leader. * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets triggered per-partition sequentially with synchronous writes for failed or controlled shutdown brokers.) > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper apis means that we wait an entire round trip before > doing the next operation. We should introduce a zookeeper client that > encourages pipelined requests to zookeeper. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5642) use async ZookeeperClient everywhere
Onur Karaman created KAFKA-5642: --- Summary: use async ZookeeperClient everywhere Key: KAFKA-5642 URL: https://issues.apache.org/jira/browse/KAFKA-5642 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Synchronous zookeeper writes means that we wait an entire round trip before doing the next write. These synchronous writes are happening at a per-partition granularity in several places, so partition-heavy clusters suffer from the controller doing many sequential round trips to zookeeper. * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in zookeeper on transition to OnlinePartition. This gets triggered per-partition sequentially with synchronous writes during controlled shutdown of the shutting down broker's replicas for which it is the leader. * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets triggered per-partition sequentially with synchronous writes for failed or controlled shutdown brokers. KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined requests to zookeeper. We should replace ZkClient's usage with this client. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5501: Summary: introduce async ZookeeperClient (was: use async zookeeper apis everywhere) > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper writes means that we wait an entire round trip before > doing the next write. These synchronous writes are happening at a > per-partition granularity in several places, so partition-heavy clusters > suffer from the controller doing many sequential round trips to zookeeper. > * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in > zookeeper on transition to OnlinePartition. This gets triggered per-partition > sequentially with synchronous writes during controlled shutdown of the > shutting down broker's replicas for which it is the leader. > * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to > OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets > triggered per-partition sequentially with synchronous writes for failed or > controlled shutdown brokers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5586) Handle client disconnects during JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16084990#comment-16084990 ] Onur Karaman commented on KAFKA-5586: - I thought we made a conscious decision in the past to not do this in the discussion relating to KAFKA-2397. I had listed pros/cons of LeaveGroupRequest vs disconnect. > Handle client disconnects during JoinGroup > -- > > Key: KAFKA-5586 > URL: https://issues.apache.org/jira/browse/KAFKA-5586 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson > > If a consumer disconnects with a JoinGroup in-flight, we do not remove it > from the group until after the Join phase completes. If the client > immediately re-sends the JoinGroup request and it already had a memberId, > then the callback will be replaced and there is no harm done. For the other > cases: > 1. If the client disconnected due to a failure and does not re-send the > JoinGroup, the consumer will still be included in the new group generation > after the rebalance completes, but will immediately timeout and trigger a new > rebalance. > 2. If the consumer was not a member of the group and re-sends JoinGroup, then > a new memberId will be created for that consumer and the old one will not be > removed. When the rebalance completes, the old memberId will timeout and a > rebalance will be triggered. > To address these issues, we should add some additional logic to handle client > disconnections during the join phase. For newly generated memberIds, we > should simply remove them. For existing members, we should probably leave > them in the group and reset the heartbeat expiration task. > Note that we currently have no facility to expose disconnects from the > network layer to the other layers, so we need to find a good approach for > this. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5502) read current brokers from zookeeper upon processing broker change
[ https://issues.apache.org/jira/browse/KAFKA-5502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5502: Description: [~lindong]'s testing of the 0.11.0 release revealed a controller-side performance regression in clusters with many brokers and many partitions when bringing up many brokers simultaneously. The regression is caused by KAFKA-5028: a Watcher receives WatchedEvent notifications from the raw ZooKeeper client EventThread. A WatchedEvent only contains the following information: - KeeperState - EventType - path Note that it does not actually contain the current data or current set of children associated with the data/child change notification. It is up to the user to do this lookup to see the current data or set of children. ZkClient is itself a Watcher. When it receives a WatchedEvent, it puts a ZkEvent into its own queue which its own ZkEventThread processes. Users of ZkClient interact with these notifications through listeners (IZkDataListener, IZkChildListener). IZkDataListener actually expects as input the current data of the watched znode, and likewise IZkChildListener actually expects as input the current set of children of the watched znode. In order to provide this information to the listeners, the ZkEventThread, when processing the ZkEvent in its queue, looks up the information (either the current data or current set of children) simultaneously sets up the next watch, and passes the result to the listener. The regression introduced in KAFKA-5028 is the time at which we lookup the information needed for the event processing. In the past, the lookup from the ZkEventThread during ZkEvent processing would be passed into the listener which is processed immediately after. For instance in ZkClient.fireChildChangedEvents: {code} List children = getChildren(path); listener.handleChildChange(path, children); {code} Now, however, there are multiple listeners that pass information looked up by the ZkEventThread into a ControllerEvent which gets processed potentially much later. For instance in BrokerChangeListener: {code} class BrokerChangeListener(controller: KafkaController) extends IZkChildListener with Logging { override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { import JavaConverters._ controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala)) } } {code} In terms of impact, this: - increases the odds of working with stale information by the time the ControllerEvent gets processed. - can cause the cluster to take a long time to stabilize if you bring up many brokers simultaneously. In terms of how to solve it: - (short term) just ignore the ZkClient's information lookup and repeat the lookup at the start of the ControllerEvent. This is the approach taken in this ticket. - (long term) try to remove a queue. This basically means getting rid of ZkClient. This is likely the approach that will be taken in KAFKA-5501. was: [~lindong]'s testing of the 0.11.0 release revealed a controller-side performance regression in clusters with many brokers and many partitions when bringing up many brokers simultaneously. The regerssion is caused by KAFKA-5028: a Watcher receives WatchedEvent notifications from the raw ZooKeeper client EventThread. A WatchedEvent only contains the following information: - KeeperState - EventType - path Note that it does not actually contain the current data or current set of children associated with the data/child change notification. It is up to the user to do this lookup to see the current data or set of children. ZkClient is itself a Watcher. When it receives a WatchedEvent, it puts a ZkEvent into its own queue which its own ZkEventThread processes. Users of ZkClient interact with these notifications through listeners (IZkDataListener, IZkChildListener). IZkDataListener actually expects as input the current data of the watched znode, and likewise IZkChildListener actually expects as input the current set of children of the watched znode. In order to provide this information to the listeners, the ZkEventThread, when processing the ZkEvent in its queue, looks up the information (either the current data or current set of children) simultaneously sets up the next watch, and passes the result to the listener. The regression introduced in KAFKA-5028 is the time at which we lookup the information needed for the event processing. In the past, the lookup from the ZkEventThread during ZkEvent processing would be passed into the listener which is processed immediately after. For instance in ZkClient.fireChildChangedEvents: {code} List children = getChildren(path); listener.handleChildChange(path, children); {code} Now, however, there are multiple listeners that pass information looked up by the ZkEventThread into a ControllerEvent which gets processed
[jira] [Created] (KAFKA-5502) read current brokers from zookeeper upon processing broker change
Onur Karaman created KAFKA-5502: --- Summary: read current brokers from zookeeper upon processing broker change Key: KAFKA-5502 URL: https://issues.apache.org/jira/browse/KAFKA-5502 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman [~lindong]'s testing of the 0.11.0 release revealed a controller-side performance regression in clusters with many brokers and many partitions when bringing up many brokers simultaneously. The regerssion is caused by KAFKA-5028: a Watcher receives WatchedEvent notifications from the raw ZooKeeper client EventThread. A WatchedEvent only contains the following information: - KeeperState - EventType - path Note that it does not actually contain the current data or current set of children associated with the data/child change notification. It is up to the user to do this lookup to see the current data or set of children. ZkClient is itself a Watcher. When it receives a WatchedEvent, it puts a ZkEvent into its own queue which its own ZkEventThread processes. Users of ZkClient interact with these notifications through listeners (IZkDataListener, IZkChildListener). IZkDataListener actually expects as input the current data of the watched znode, and likewise IZkChildListener actually expects as input the current set of children of the watched znode. In order to provide this information to the listeners, the ZkEventThread, when processing the ZkEvent in its queue, looks up the information (either the current data or current set of children) simultaneously sets up the next watch, and passes the result to the listener. The regression introduced in KAFKA-5028 is the time at which we lookup the information needed for the event processing. In the past, the lookup from the ZkEventThread during ZkEvent processing would be passed into the listener which is processed immediately after. For instance in ZkClient.fireChildChangedEvents: {code} List children = getChildren(path); listener.handleChildChange(path, children); {code} Now, however, there are multiple listeners that pass information looked up by the ZkEventThread into a ControllerEvent which gets processed potentially much later. For instance in BrokerChangeListener: {code} class BrokerChangeListener(controller: KafkaController) extends IZkChildListener with Logging { override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { import JavaConverters._ controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala)) } } {code} In terms of impact, this: - increases the odds of working with stale information by the time the ControllerEvent gets processed. - can cause the cluster to take a long time to stabilize if you bring up many brokers simultaneously. In terms of how to solve it: - (short term) just ignore the ZkClient's information lookup and repeat the lookup at the start of the ControllerEvent. This is the approach taken in this ticket. - (long term) try to remove a queue. This basically means getting rid of ZkClient. This is likely the approach that will be taken in KAFKA-5501. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5501) use async zookeeper apis everywhere
Onur Karaman created KAFKA-5501: --- Summary: use async zookeeper apis everywhere Key: KAFKA-5501 URL: https://issues.apache.org/jira/browse/KAFKA-5501 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Synchronous zookeeper writes means that we wait an entire round trip before doing the next write. These synchronous writes are happening at a per-partition granularity in several places, so partition-heavy clusters suffer from the controller doing many sequential round trips to zookeeper. * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in zookeeper on transition to OnlinePartition. This gets triggered per-partition sequentially with synchronous writes during controlled shutdown of the shutting down broker's replicas for which it is the leader. * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets triggered per-partition sequentially with synchronous writes for failed or controlled shutdown brokers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)