kafka git commit: MINOR: Handle error metrics removal during shutdown
Repository: kafka Updated Branches: refs/heads/1.0 1a5a547bb -> 62ce9ea47 MINOR: Handle error metrics removal during shutdown Author: Rajini SivaramReviewers: Ismael Juma Closes #4187 from rajinisivaram/MINOR-metrics-cleanup Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62ce9ea4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62ce9ea4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62ce9ea4 Branch: refs/heads/1.0 Commit: 62ce9ea47db6029fffa7b34dace16039b45d5a69 Parents: 1a5a547 Author: Rajini Sivaram Authored: Wed Nov 15 09:54:42 2017 + Committer: Rajini Sivaram Committed: Wed Nov 15 10:04:45 2017 + -- .../scala/kafka/network/RequestChannel.scala| 7 +- .../main/scala/kafka/network/SocketServer.scala | 24 .../kafka/server/KafkaRequestHandler.scala | 1 - .../main/scala/kafka/server/KafkaServer.scala | 9 +++- .../unit/kafka/network/SocketServerTest.scala | 7 +++--- 5 files changed, 37 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/62ce9ea4/core/src/main/scala/kafka/network/RequestChannel.scala -- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a4ec5e3..a50af45 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -315,10 +315,15 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } } - def shutdown() { + def clear() { requestQueue.clear() } + def shutdown() { +clear() +metrics.close() + } + def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) } http://git-wip-us.apache.org/repos/asf/kafka/blob/62ce9ea4/core/src/main/scala/kafka/network/SocketServer.scala -- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index bea8f79..4366fea 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -72,6 +72,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() private var connectionQuotas: ConnectionQuotas = _ + private var stoppedProcessingRequests = false /** * Start the socket server @@ -132,13 +133,28 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time requestChannel.addResponseListener(id => processors(id).wakeup()) /** - * Shutdown the socket server - */ - def shutdown() = { -info("Shutting down") +* Stop processing requests and new connections. +*/ + def stopProcessingRequests() = { +info("Stopping socket server request processors") this.synchronized { acceptors.values.foreach(_.shutdown) processors.foreach(_.shutdown) + requestChannel.clear() + stoppedProcessingRequests = true +} +info("Stopped socket server request processors") + } + + /** +* Shutdown the socket server. If still processing requests, shutdown +* acceptors and processors first. +*/ + def shutdown() = { +info("Shutting down socket server") +this.synchronized { + if (!stoppedProcessingRequests) +stopProcessingRequests() requestChannel.shutdown() } info("Shutdown completed") http://git-wip-us.apache.org/repos/asf/kafka/blob/62ce9ea4/core/src/main/scala/kafka/server/KafkaRequestHandler.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 3d8dbd9..a498781 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -105,7 +105,6 @@ class KafkaRequestHandlerPool(val brokerId: Int, handler.initiateShutdown() for (handler <- runnables) handler.awaitShutdown() -requestChannel.metrics.close() info("shut down completely") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/62ce9ea4/core/src/main/scala/kafka/server/KafkaServer.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0d8e694..cfdce2a 100755 ---
kafka git commit: MINOR: Handle error metrics removal during shutdown
Repository: kafka Updated Branches: refs/heads/trunk d04daf570 -> 3cfbb25c6 MINOR: Handle error metrics removal during shutdown Author: Rajini SivaramReviewers: Ismael Juma Closes #4187 from rajinisivaram/MINOR-metrics-cleanup Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cfbb25c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cfbb25c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cfbb25c Branch: refs/heads/trunk Commit: 3cfbb25c616bee44ac181e9320d4fd6d79ab9c58 Parents: d04daf5 Author: Rajini Sivaram Authored: Wed Nov 15 09:54:42 2017 + Committer: Rajini Sivaram Committed: Wed Nov 15 09:54:42 2017 + -- .../scala/kafka/network/RequestChannel.scala| 7 +- .../main/scala/kafka/network/SocketServer.scala | 24 .../kafka/server/KafkaRequestHandler.scala | 1 - .../main/scala/kafka/server/KafkaServer.scala | 9 +++- .../unit/kafka/network/SocketServerTest.scala | 7 +++--- 5 files changed, 37 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/RequestChannel.scala -- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a4ec5e3..a50af45 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -315,10 +315,15 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } } - def shutdown() { + def clear() { requestQueue.clear() } + def shutdown() { +clear() +metrics.close() + } + def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) } http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/SocketServer.scala -- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index bea8f79..4366fea 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -72,6 +72,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() private var connectionQuotas: ConnectionQuotas = _ + private var stoppedProcessingRequests = false /** * Start the socket server @@ -132,13 +133,28 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time requestChannel.addResponseListener(id => processors(id).wakeup()) /** - * Shutdown the socket server - */ - def shutdown() = { -info("Shutting down") +* Stop processing requests and new connections. +*/ + def stopProcessingRequests() = { +info("Stopping socket server request processors") this.synchronized { acceptors.values.foreach(_.shutdown) processors.foreach(_.shutdown) + requestChannel.clear() + stoppedProcessingRequests = true +} +info("Stopped socket server request processors") + } + + /** +* Shutdown the socket server. If still processing requests, shutdown +* acceptors and processors first. +*/ + def shutdown() = { +info("Shutting down socket server") +this.synchronized { + if (!stoppedProcessingRequests) +stopProcessingRequests() requestChannel.shutdown() } info("Shutdown completed") http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 3d8dbd9..a498781 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -105,7 +105,6 @@ class KafkaRequestHandlerPool(val brokerId: Int, handler.initiateShutdown() for (handler <- runnables) handler.awaitShutdown() -requestChannel.metrics.close() info("shut down completely") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaServer.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a0732fd..a13f5af 100755 ---