Repository: kafka Updated Branches: refs/heads/trunk ea42d6535 -> 88d8508b8
KAFKA-4443; Minor comment clean-up Removed stale comment left behind, minor fixes (UpdateMetadataRequest instead of MetadataUpdateRequest) and remove redundant comments. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jiangjie (Becket) Qin <becket....@gmail.com>, Dong Lin <lindon...@gmail.com> Closes #2194 from ijuma/kafka-4443-minor-follow-up Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/88d8508b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/88d8508b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/88d8508b Branch: refs/heads/trunk Commit: 88d8508b85bb11d452658ff5c32afcb12c314f9a Parents: ea42d65 Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu Dec 1 15:39:06 2016 +0000 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu Dec 1 15:39:06 2016 +0000 ---------------------------------------------------------------------- .../scala/kafka/controller/KafkaController.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/88d8508b/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7ec38ee..1f6e19a 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -316,29 +316,32 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) - //read controller epoch from zk readControllerEpochFromZookeeper() - // increment the controller epoch incrementControllerEpoch(zkUtils.zkClient) + // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() + initializeControllerContext() - // We need to send MetadataUpdateRequest after controller context is initialized and before state machines are started. - // This is because broker needs to receive the list of live brokers from MetadataUpdateRequest first in order to process - // any LeaderAndIsrRequest that is generated by replicaStateMachine.startup() and partitionStateMachine.startup(). + + // We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines + // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before + // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and + // partitionStateMachine.startup(). sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + replicaStateMachine.startup() partitionStateMachine.startup() + // register the partition change listeners for all existing topics on failover controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() - /* send partition leadership info to all live brokers */ if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup()