lbradstreet commented on a change in pull request #8807:
URL: https://github.com/apache/kafka/pull/8807#discussion_r436289337



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1242,131 +1243,138 @@ class ReplicaManager(val config: KafkaConfig,
             s"epoch ${leaderAndIsrRequest.controllerEpoch}")
         }
 
-      if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
-        stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller 
$controllerId with " +
-          s"correlation id $correlationId since its controller epoch 
${leaderAndIsrRequest.controllerEpoch} is old. " +
-          s"Latest known controller epoch is $controllerEpoch")
-        leaderAndIsrRequest.getErrorResponse(0, 
Errors.STALE_CONTROLLER_EPOCH.exception)
-      } else {
-        val responseMap = new mutable.HashMap[TopicPartition, Errors]
-        controllerEpoch = leaderAndIsrRequest.controllerEpoch
-
-        val partitionStates = new mutable.HashMap[Partition, 
LeaderAndIsrPartitionState]()
-
-        // First create the partition if it doesn't exist already
-        requestPartitionStates.foreach { partitionState =>
-          val topicPartition = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
-          val partitionOpt = getPartition(topicPartition) match {
-            case HostedPartition.Offline =>
-              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId 
" +
-                s"epoch $controllerEpoch for partition $topicPartition as the 
local replica for the " +
-                "partition is in an offline log directory")
-              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-              None
+      val response = {
+        if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
+          stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from 
controller $controllerId with " +
+            s"correlation id $correlationId since its controller epoch 
${leaderAndIsrRequest.controllerEpoch} is old. " +
+            s"Latest known controller epoch is $controllerEpoch")
+          leaderAndIsrRequest.getErrorResponse(0, 
Errors.STALE_CONTROLLER_EPOCH.exception)
+        } else {
+          val responseMap = new mutable.HashMap[TopicPartition, Errors]
+          controllerEpoch = leaderAndIsrRequest.controllerEpoch
 
-            case HostedPartition.Online(partition) =>
-              Some(partition)
+          val partitionStates = new mutable.HashMap[Partition, 
LeaderAndIsrPartitionState]()
 
-            case HostedPartition.None =>
-              val partition = Partition(topicPartition, time, this)
-              allPartitions.putIfNotExists(topicPartition, 
HostedPartition.Online(partition))
-              Some(partition)
-          }
+          // First create the partition if it doesn't exist already
+          requestPartitionStates.foreach { partitionState =>
+            val topicPartition = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
+            val partitionOpt = getPartition(topicPartition) match {
+              case HostedPartition.Offline =>
+                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id 
$correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition as 
the local replica for the " +
+                  "partition is in an offline log directory")
+                responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+                None
+
+              case HostedPartition.Online(partition) =>
+                Some(partition)
+
+              case HostedPartition.None =>
+                val partition = Partition(topicPartition, time, this)
+                allPartitions.putIfNotExists(topicPartition, 
HostedPartition.Online(partition))
+                Some(partition)
+            }
 
-          // Next check partition's leader epoch
-          partitionOpt.foreach { partition =>
-            val currentLeaderEpoch = partition.getLeaderEpoch
-            val requestLeaderEpoch = partitionState.leaderEpoch
-            if (requestLeaderEpoch > currentLeaderEpoch) {
-              // If the leader epoch is valid record the epoch of the 
controller that made the leadership decision.
-              // This is useful while updating the isr to maintain the 
decision maker controller's epoch in the zookeeper path
-              if (partitionState.replicas.contains(localBrokerId))
-                partitionStates.put(partition, partitionState)
-              else {
-                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from 
controller $controllerId with " +
-                  s"correlation id $correlationId epoch $controllerEpoch for 
partition $topicPartition as itself is not " +
-                  s"in assigned replica list 
${partitionState.replicas.asScala.mkString(",")}")
-                responseMap.put(topicPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            // Next check partition's leader epoch
+            partitionOpt.foreach { partition =>
+              val currentLeaderEpoch = partition.getLeaderEpoch
+              val requestLeaderEpoch = partitionState.leaderEpoch
+              if (requestLeaderEpoch > currentLeaderEpoch) {
+                // If the leader epoch is valid record the epoch of the 
controller that made the leadership decision.
+                // This is useful while updating the isr to maintain the 
decision maker controller's epoch in the zookeeper path
+                if (partitionState.replicas.contains(localBrokerId))
+                  partitionStates.put(partition, partitionState)
+                else {
+                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from 
controller $controllerId with " +
+                    s"correlation id $correlationId epoch $controllerEpoch for 
partition $topicPartition as itself is not " +
+                    s"in assigned replica list 
${partitionState.replicas.asScala.mkString(",")}")
+                  responseMap.put(topicPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                }
+              } else if (requestLeaderEpoch < currentLeaderEpoch) {
+                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id 
$correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
+                  s"leader epoch $requestLeaderEpoch is smaller than the 
current " +
+                  s"leader epoch $currentLeaderEpoch")
+                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+              } else {
+                stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
+                  s"controller $controllerId with correlation id 
$correlationId " +
+                  s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
+                  s"leader epoch $requestLeaderEpoch matches the current 
leader epoch")
+                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
               }
-            } else if (requestLeaderEpoch < currentLeaderEpoch) {
-              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId 
" +
-                s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
-                s"leader epoch $requestLeaderEpoch is smaller than the current 
" +
-                s"leader epoch $currentLeaderEpoch")
-              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
-            } else {
-              stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId 
" +
-                s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
-                s"leader epoch $requestLeaderEpoch matches the current leader 
epoch")
-              responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
             }
           }
-        }
 
-        val partitionsToBeLeader = partitionStates.filter { case (_, 
partitionState) =>
-          partitionState.leader == localBrokerId
-        }
-        val partitionsToBeFollower = partitionStates.filter { case (k, _) => 
!partitionsToBeLeader.contains(k) }
+          val partitionsToBeLeader = partitionStates.filter { case (_, 
partitionState) =>
+            partitionState.leader == localBrokerId
+          }
+          val partitionsToBeFollower = partitionStates.filter { case (k, _) => 
!partitionsToBeLeader.contains(k) }
 
-        val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
-        val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
-          makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, 
correlationId, responseMap,
-            highWatermarkCheckpoints)
-        else
-          Set.empty[Partition]
-        val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, 
correlationId, responseMap,
-            highWatermarkCheckpoints)
-        else
-          Set.empty[Partition]
+          val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+          val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
+            makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, 
correlationId, responseMap,
+              highWatermarkCheckpoints)
+          else
+            Set.empty[Partition]
+          val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
+            makeFollowers(controllerId, controllerEpoch, 
partitionsToBeFollower, correlationId, responseMap,
+              highWatermarkCheckpoints)
+          else
+            Set.empty[Partition]
 
-        /*
+          /*
          * KAFKA-8392
          * For topic partitions of which the broker is no longer a leader, 
delete metrics related to
          * those topics. Note that this means the broker stops being either a 
replica or a leader of
          * partitions of said topics
          */
-        val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
-        val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
-        
followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
+          val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
+          val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
+          
followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
 
-        // remove metrics for brokers which are not followers of a topic
-        
leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
+          // remove metrics for brokers which are not followers of a topic
+          
leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
 
-        leaderAndIsrRequest.partitionStates.forEach { partitionState =>
-          val topicPartition = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
-          /*
+          leaderAndIsrRequest.partitionStates.forEach { partitionState =>
+            val topicPartition = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
+            /*
            * If there is offline log directory, a Partition object may have 
been created by getOrCreatePartition()
            * before getOrCreateReplica() failed to create local replica due to 
KafkaStorageException.
            * In this case ReplicaManager.allPartitions will map this 
topic-partition to an empty Partition object.
            * we need to map this topic-partition to OfflinePartition instead.
            */
-          if (localLog(topicPartition).isEmpty)
-            markPartitionOffline(topicPartition)
-        }
-
-        // we initialize highwatermark thread after the first 
leaderisrrequest. This ensures that all the partitions
-        // have been completely populated before starting the checkpointing 
there by avoiding weird race conditions
-        startHighWatermarkCheckPointThread()
-
-        maybeAddLogDirFetchers(partitionStates.keySet, 
highWatermarkCheckpoints)
+            if (localLog(topicPartition).isEmpty)
+              markPartitionOffline(topicPartition)
+          }
 
-        replicaFetcherManager.shutdownIdleFetcherThreads()
-        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
-        onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-        val responsePartitions = responseMap.iterator.map { case (tp, error) =>
-          new LeaderAndIsrPartitionError()
-            .setTopicName(tp.topic)
-            .setPartitionIndex(tp.partition)
-            .setErrorCode(error.code)
-        }.toBuffer
-        new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-          .setErrorCode(Errors.NONE.code)
-          .setPartitionErrors(responsePartitions.asJava))
+          // we initialize highwatermark thread after the first 
leaderisrrequest. This ensures that all the partitions
+          // have been completely populated before starting the checkpointing 
there by avoiding weird race conditions
+          startHighWatermarkCheckPointThread()
+
+          maybeAddLogDirFetchers(partitionStates.keySet, 
highWatermarkCheckpoints)
+
+          replicaFetcherManager.shutdownIdleFetcherThreads()
+          replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+          onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+          val responsePartitions = responseMap.iterator.map { case (tp, error) 
=>
+            new LeaderAndIsrPartitionError()
+              .setTopicName(tp.topic)
+              .setPartitionIndex(tp.partition)
+              .setErrorCode(error.code)
+          }.toBuffer
+          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+            .setErrorCode(Errors.NONE.code)
+            .setPartitionErrors(responsePartitions.asJava))
+        }
       }
+      val endMs = time.milliseconds()

Review comment:
       For ease of review, it's worth mentioning in the PR body that the only 
lines that changed here are 1373-1377 and 1233, with the response being 
captured in 1246.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to