Repository: kafka
Updated Branches:
  refs/heads/1.0 dc907d9b7 -> 6c01d68c9


KAFKA-6012; Close request metrics only after closing request handlers

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #4024 from rajinisivaram/KAFKA-6012-error-metric

(cherry picked from commit e40b3a2e74133de6d60599beefb65407ca4cc7dd)
Signed-off-by: Rajini Sivaram <rajinisiva...@googlemail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6c01d68c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6c01d68c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6c01d68c

Branch: refs/heads/1.0
Commit: 6c01d68c994d966da1cf74e7127473bda2ea3a46
Parents: dc907d9
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Thu Oct 5 12:25:34 2017 -0400
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu Oct 5 12:26:01 2017 -0400

----------------------------------------------------------------------
 .../scala/kafka/network/RequestChannel.scala    |  3 +-
 .../kafka/server/KafkaRequestHandler.scala      |  1 +
 .../unit/kafka/network/SocketServerTest.scala   | 55 +++++++++++++-------
 3 files changed, 38 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6c01d68c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index c97e3af..ec16ab0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -60,7 +60,7 @@ object RequestChannel extends Logging {
 
     def apply(metricName: String) = metricsMap(metricName)
 
-    def shutdown(): Unit = {
+    def close(): Unit = {
        metricsMap.values.foreach(_.removeMetrics())
     }
   }
@@ -318,7 +318,6 @@ class RequestChannel(val numProcessors: Int, val queueSize: 
Int) extends KafkaMe
 
   def shutdown() {
     requestQueue.clear()
-    metrics.shutdown()
   }
 
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6c01d68c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index a498781..3d8dbd9 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,6 +105,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
       handler.initiateShutdown()
     for (handler <- runnables)
       handler.awaitShutdown()
+    requestChannel.metrics.close()
     info("shut down completely")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6c01d68c/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 633138b..aebbf5c 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -24,7 +24,7 @@ import java.nio.channels.SocketChannel
 import java.util.{HashMap, Random}
 import javax.net.ssl._
 
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Meter}
 import com.yammer.metrics.{Metrics => YammerMetrics}
 import kafka.network.RequestChannel.SendAction
 import kafka.security.CredentialProvider
@@ -34,7 +34,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelState, 
KafkaChannel, ListenerName, NetworkReceive, NetworkSend, Selector, Send}
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, 
RequestHeader}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
@@ -133,10 +133,15 @@ class SocketServerTest extends JUnitSuite {
     receiveRequest(server.requestChannel)
   }
 
+  def shutdownServerAndMetrics(server: SocketServer): Unit = {
+    server.shutdown()
+    server.metrics.close()
+    server.requestChannel.metrics.close()
+  }
+
   @After
   def tearDown() {
-    metrics.close()
-    server.shutdown()
+    shutdownServerAndMetrics(server)
     sockets.foreach(_.close())
     sockets.clear()
   }
@@ -260,8 +265,7 @@ class SocketServerTest extends JUnitSuite {
       assertNull("Received request after failed send", 
overrideServer.requestChannel.receiveRequest(200))
 
     } finally {
-      overrideServer.shutdown()
-      serverMetrics.close()
+      shutdownServerAndMetrics(overrideServer)
     }
   }
 
@@ -342,8 +346,7 @@ class SocketServerTest extends JUnitSuite {
       newChannel.disconnect()
 
     } finally {
-      overrideServer.shutdown()
-      serverMetrics.close()
+      shutdownServerAndMetrics(overrideServer)
     }
   }
 
@@ -380,7 +383,7 @@ class SocketServerTest extends JUnitSuite {
     // make sure the sockets are open
     server.acceptors.values.foreach(acceptor => 
assertFalse(acceptor.serverChannel.socket.isClosed))
     // then shutdown the server
-    server.shutdown()
+    shutdownServerAndMetrics(server)
 
     val largeChunkOfBytes = new Array[Byte](1000000)
     // doing a subsequent send should throw an exception as the connection 
should be closed.
@@ -438,8 +441,7 @@ class SocketServerTest extends JUnitSuite {
       conn.setSoTimeout(3000)
       assertEquals(-1, conn.getInputStream.read())
     } finally {
-      overrideServer.shutdown()
-      serverMetrics.close()
+      shutdownServerAndMetrics(overrideServer)
     }
   }
 
@@ -479,8 +481,7 @@ class SocketServerTest extends JUnitSuite {
       assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
       sslSocket.close()
     } finally {
-      overrideServer.shutdown()
-      serverMetrics.close()
+      shutdownServerAndMetrics(overrideServer)
     }
   }
 
@@ -534,8 +535,7 @@ class SocketServerTest extends JUnitSuite {
         s"request metrics not updated, expected: $expectedTotalTimeCount, 
actual: ${totalTimeHistCount()}")
 
     } finally {
-      overrideServer.shutdown()
-      serverMetrics.close()
+      shutdownServerAndMetrics(overrideServer)
     }
   }
 
@@ -571,10 +571,28 @@ class SocketServerTest extends JUnitSuite {
         s"request metrics not updated, expected: $expectedTotalTimeCount, 
actual: ${totalTimeHistCount()}")
 
     } finally {
-      overrideServer.shutdown()
-      serverMetrics.close()
+      shutdownServerAndMetrics(overrideServer)
     }
+  }
+
+  @Test
+  def testRequestMetricsAfterShutdown(): Unit = {
+    server.shutdown()
+
+    server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark()
+    server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE 
-> 1))
+    val nonZeroMeters = 
Map("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce" -> 
1,
+        
"kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE"
 -> 1)
+
+    def requestMetricMeters = YammerMetrics
+      .defaultRegistry
+      .allMetrics.asScala
+      .filterKeys(k => k.getType == "RequestMetrics")
+      .collect { case (k, metric: Meter) => (k.toString, metric.count) }
 
+    assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) 
=> value != 0 })
+    server.requestChannel.metrics.close()
+    assertEquals(Map.empty, requestMetricMeters)
   }
 
   @Test
@@ -844,8 +862,7 @@ class SocketServerTest extends JUnitSuite {
     try {
         testWithServer(testableServer)
     } finally {
-      testableServer.shutdown()
-      testableServer.metrics.close()
+      shutdownServerAndMetrics(testableServer)
     }
   }
 

Reply via email to