This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 129859d  KAFKA-6752: Enable unclean leader election metric (#4838)
129859d is described below

commit 129859d0f7bf6cfe394706d6b69a18be510b32d8
Author: Manikumar Reddy O <manikumar.re...@gmail.com>
AuthorDate: Wed Apr 11 23:00:30 2018 +0530

    KAFKA-6752: Enable unclean leader election metric (#4838)
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 .../kafka/controller/PartitionStateMachine.scala   |  9 +++++---
 .../PartitionLeaderElectionAlgorithmsTest.scala    | 26 ++++++++++++++++------
 .../integration/UncleanLeaderElectionTest.scala    | 14 ++++++++++--
 3 files changed, 37 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 2e27272..d760061 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig,
       if (leaderIsrAndControllerEpochOpt.nonEmpty) {
         val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
         val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-        val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, 
isr, liveReplicas.toSet, uncleanLeaderElectionEnabled)
+        val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, 
isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => 
controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
@@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig,
 }
 
 object PartitionLeaderElectionAlgorithms {
-  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], 
liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = {
+  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], 
liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, 
controllerContext: ControllerContext): Option[Int] = {
     assignment.find(id => liveReplicas.contains(id) && 
isr.contains(id)).orElse {
       if (uncleanLeaderElectionEnabled) {
-        assignment.find(liveReplicas.contains)
+        val leaderOpt = assignment.find(liveReplicas.contains)
+        if (!leaderOpt.isEmpty)
+          controllerContext.stats.uncleanLeaderElectionRate.mark()
+        leaderOpt
       } else {
         None
       }
diff --git 
a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
 
b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
index f149fc9..113a39d 100644
--- 
a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
+++ 
b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
@@ -17,10 +17,17 @@
 package kafka.controller
 
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
 import org.scalatest.junit.JUnitSuite
 
 class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
+  private var controllerContext: ControllerContext = null
+
+  @Before
+  def setUp(): Unit = {
+    controllerContext = new ControllerContext
+    controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")
+  }
 
   @Test
   def testOfflinePartitionLeaderElection(): Unit = {
@@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
     val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      uncleanLeaderElectionEnabled = false,
+      controllerContext)
     assertEquals(Option(4), leaderOpt)
   }
 
@@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
     val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      uncleanLeaderElectionEnabled = false,
+      controllerContext)
     assertEquals(None, leaderOpt)
+    assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count())
   }
+
   @Test
   def 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): 
Unit = {
     val assignment = Seq(2, 4)
@@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
     val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = true)
+      uncleanLeaderElectionEnabled = true,
+      controllerContext)
     assertEquals(Option(4), leaderOpt)
+    assertEquals(1, controllerContext.stats.uncleanLeaderElectionRate.count())
   }
 
   @Test
@@ -62,10 +75,9 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
     val reassignment = Seq(2, 4)
     val isr = Seq(2, 4)
     val liveReplicas = Set(4)
-    val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(reassignment,
+    val leaderOpt = 
PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment,
       isr,
-      liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      liveReplicas)
     assertEquals(Option(4), leaderOpt)
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 5269f92..608f3a6 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -191,12 +191,17 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
     produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
+    //remove any previous unclean election metric
+    servers.map(server => 
server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
     // shutdown leader and then restart follower
     servers.filter(server => server.config.brokerId == leaderId).map(server => 
shutdownServer(server))
-    servers.filter(server => server.config.brokerId == followerId).map(server 
=> server.startup())
+    val followerServer = servers.find(_.config.brokerId == followerId).get
+    followerServer.startup()
 
     // wait until new leader is (uncleanly) elected
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(followerId))
+    assertEquals(1, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
 
     produceMessage(servers, topic, "third")
 
@@ -224,12 +229,17 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
     produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
+    //remove any previous unclean election metric
+    servers.map(server => 
server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
     // shutdown leader and then restart follower
     servers.filter(server => server.config.brokerId == leaderId).map(server => 
shutdownServer(server))
-    servers.filter(server => server.config.brokerId == followerId).map(server 
=> server.startup())
+    val followerServer = servers.find(_.config.brokerId == followerId).get
+    followerServer.startup()
 
     // verify that unclean election to non-ISR follower does not occur
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(-1))
+    assertEquals(0, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
 
     // message production and consumption should both fail while leader is down
     try {

-- 
To stop receiving notification emails like this one, please contact
jun...@apache.org.

Reply via email to