Repository: kafka Updated Branches: refs/heads/1.0 dc907d9b7 -> 6c01d68c9
KAFKA-6012; Close request metrics only after closing request handlers Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #4024 from rajinisivaram/KAFKA-6012-error-metric (cherry picked from commit e40b3a2e74133de6d60599beefb65407ca4cc7dd) Signed-off-by: Rajini Sivaram <rajinisiva...@googlemail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6c01d68c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6c01d68c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6c01d68c Branch: refs/heads/1.0 Commit: 6c01d68c994d966da1cf74e7127473bda2ea3a46 Parents: dc907d9 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Thu Oct 5 12:25:34 2017 -0400 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Thu Oct 5 12:26:01 2017 -0400 ---------------------------------------------------------------------- .../scala/kafka/network/RequestChannel.scala | 3 +- .../kafka/server/KafkaRequestHandler.scala | 1 + .../unit/kafka/network/SocketServerTest.scala | 55 +++++++++++++------- 3 files changed, 38 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6c01d68c/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index c97e3af..ec16ab0 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -60,7 +60,7 @@ object RequestChannel extends Logging { def apply(metricName: String) = metricsMap(metricName) - def shutdown(): Unit = { + def close(): Unit = { metricsMap.values.foreach(_.removeMetrics()) } } @@ -318,7 +318,6 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe def shutdown() { requestQueue.clear() - metrics.shutdown() } def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) http://git-wip-us.apache.org/repos/asf/kafka/blob/6c01d68c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index a498781..3d8dbd9 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -105,6 +105,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, handler.initiateShutdown() for (handler <- runnables) handler.awaitShutdown() + requestChannel.metrics.close() info("shut down completely") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6c01d68c/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 633138b..aebbf5c 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -24,7 +24,7 @@ import java.nio.channels.SocketChannel import java.util.{HashMap, Random} import javax.net.ssl._ -import com.yammer.metrics.core.Gauge +import com.yammer.metrics.core.{Gauge, Meter} import com.yammer.metrics.{Metrics => YammerMetrics} import kafka.network.RequestChannel.SendAction import kafka.security.CredentialProvider @@ -34,7 +34,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ChannelBuilder, ChannelState, KafkaChannel, ListenerName, NetworkReceive, NetworkSend, Selector, Send} -import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} @@ -133,10 +133,15 @@ class SocketServerTest extends JUnitSuite { receiveRequest(server.requestChannel) } + def shutdownServerAndMetrics(server: SocketServer): Unit = { + server.shutdown() + server.metrics.close() + server.requestChannel.metrics.close() + } + @After def tearDown() { - metrics.close() - server.shutdown() + shutdownServerAndMetrics(server) sockets.foreach(_.close()) sockets.clear() } @@ -260,8 +265,7 @@ class SocketServerTest extends JUnitSuite { assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200)) } finally { - overrideServer.shutdown() - serverMetrics.close() + shutdownServerAndMetrics(overrideServer) } } @@ -342,8 +346,7 @@ class SocketServerTest extends JUnitSuite { newChannel.disconnect() } finally { - overrideServer.shutdown() - serverMetrics.close() + shutdownServerAndMetrics(overrideServer) } } @@ -380,7 +383,7 @@ class SocketServerTest extends JUnitSuite { // make sure the sockets are open server.acceptors.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) // then shutdown the server - server.shutdown() + shutdownServerAndMetrics(server) val largeChunkOfBytes = new Array[Byte](1000000) // doing a subsequent send should throw an exception as the connection should be closed. @@ -438,8 +441,7 @@ class SocketServerTest extends JUnitSuite { conn.setSoTimeout(3000) assertEquals(-1, conn.getInputStream.read()) } finally { - overrideServer.shutdown() - serverMetrics.close() + shutdownServerAndMetrics(overrideServer) } } @@ -479,8 +481,7 @@ class SocketServerTest extends JUnitSuite { assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq) sslSocket.close() } finally { - overrideServer.shutdown() - serverMetrics.close() + shutdownServerAndMetrics(overrideServer) } } @@ -534,8 +535,7 @@ class SocketServerTest extends JUnitSuite { s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}") } finally { - overrideServer.shutdown() - serverMetrics.close() + shutdownServerAndMetrics(overrideServer) } } @@ -571,10 +571,28 @@ class SocketServerTest extends JUnitSuite { s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}") } finally { - overrideServer.shutdown() - serverMetrics.close() + shutdownServerAndMetrics(overrideServer) } + } + + @Test + def testRequestMetricsAfterShutdown(): Unit = { + server.shutdown() + + server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark() + server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1)) + val nonZeroMeters = Map("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce" -> 1, + "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1) + + def requestMetricMeters = YammerMetrics + .defaultRegistry + .allMetrics.asScala + .filterKeys(k => k.getType == "RequestMetrics") + .collect { case (k, metric: Meter) => (k.toString, metric.count) } + assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) => value != 0 }) + server.requestChannel.metrics.close() + assertEquals(Map.empty, requestMetricMeters) } @Test @@ -844,8 +862,7 @@ class SocketServerTest extends JUnitSuite { try { testWithServer(testableServer) } finally { - testableServer.shutdown() - testableServer.metrics.close() + shutdownServerAndMetrics(testableServer) } }