kafka git commit: MINOR: Handle error metrics removal during shutdown

2017-11-15 Thread rsivaram
Repository: kafka
Updated Branches:
  refs/heads/1.0 1a5a547bb -> 62ce9ea47


MINOR: Handle error metrics removal during shutdown

Author: Rajini Sivaram 

Reviewers: Ismael Juma 

Closes #4187 from rajinisivaram/MINOR-metrics-cleanup

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62ce9ea4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62ce9ea4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62ce9ea4

Branch: refs/heads/1.0
Commit: 62ce9ea47db6029fffa7b34dace16039b45d5a69
Parents: 1a5a547
Author: Rajini Sivaram 
Authored: Wed Nov 15 09:54:42 2017 +
Committer: Rajini Sivaram 
Committed: Wed Nov 15 10:04:45 2017 +

--
 .../scala/kafka/network/RequestChannel.scala|  7 +-
 .../main/scala/kafka/network/SocketServer.scala | 24 
 .../kafka/server/KafkaRequestHandler.scala  |  1 -
 .../main/scala/kafka/server/KafkaServer.scala   |  9 +++-
 .../unit/kafka/network/SocketServerTest.scala   |  7 +++---
 5 files changed, 37 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/62ce9ea4/core/src/main/scala/kafka/network/RequestChannel.scala
--
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index a4ec5e3..a50af45 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -315,10 +315,15 @@ class RequestChannel(val numProcessors: Int, val 
queueSize: Int) extends KafkaMe
 }
   }
 
-  def shutdown() {
+  def clear() {
 requestQueue.clear()
   }
 
+  def shutdown() {
+clear()
+metrics.close()
+  }
+
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/62ce9ea4/core/src/main/scala/kafka/network/SocketServer.scala
--
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index bea8f79..4366fea 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -72,6 +72,7 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
 
   private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
   private var connectionQuotas: ConnectionQuotas = _
+  private var stoppedProcessingRequests = false
 
   /**
* Start the socket server
@@ -132,13 +133,28 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   requestChannel.addResponseListener(id => processors(id).wakeup())
 
   /**
-   * Shutdown the socket server
-   */
-  def shutdown() = {
-info("Shutting down")
+* Stop processing requests and new connections.
+*/
+  def stopProcessingRequests() = {
+info("Stopping socket server request processors")
 this.synchronized {
   acceptors.values.foreach(_.shutdown)
   processors.foreach(_.shutdown)
+  requestChannel.clear()
+  stoppedProcessingRequests = true
+}
+info("Stopped socket server request processors")
+  }
+
+  /**
+* Shutdown the socket server. If still processing requests, shutdown
+* acceptors and processors first.
+*/
+  def shutdown() = {
+info("Shutting down socket server")
+this.synchronized {
+  if (!stoppedProcessingRequests)
+stopProcessingRequests()
   requestChannel.shutdown()
 }
 info("Shutdown completed")

http://git-wip-us.apache.org/repos/asf/kafka/blob/62ce9ea4/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 3d8dbd9..a498781 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,7 +105,6 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   handler.initiateShutdown()
 for (handler <- runnables)
   handler.awaitShutdown()
-requestChannel.metrics.close()
 info("shut down completely")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/62ce9ea4/core/src/main/scala/kafka/server/KafkaServer.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d8e694..cfdce2a 100755
--- 

kafka git commit: MINOR: Handle error metrics removal during shutdown

2017-11-15 Thread rsivaram
Repository: kafka
Updated Branches:
  refs/heads/trunk d04daf570 -> 3cfbb25c6


MINOR: Handle error metrics removal during shutdown

Author: Rajini Sivaram 

Reviewers: Ismael Juma 

Closes #4187 from rajinisivaram/MINOR-metrics-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cfbb25c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cfbb25c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cfbb25c

Branch: refs/heads/trunk
Commit: 3cfbb25c616bee44ac181e9320d4fd6d79ab9c58
Parents: d04daf5
Author: Rajini Sivaram 
Authored: Wed Nov 15 09:54:42 2017 +
Committer: Rajini Sivaram 
Committed: Wed Nov 15 09:54:42 2017 +

--
 .../scala/kafka/network/RequestChannel.scala|  7 +-
 .../main/scala/kafka/network/SocketServer.scala | 24 
 .../kafka/server/KafkaRequestHandler.scala  |  1 -
 .../main/scala/kafka/server/KafkaServer.scala   |  9 +++-
 .../unit/kafka/network/SocketServerTest.scala   |  7 +++---
 5 files changed, 37 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/RequestChannel.scala
--
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index a4ec5e3..a50af45 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -315,10 +315,15 @@ class RequestChannel(val numProcessors: Int, val 
queueSize: Int) extends KafkaMe
 }
   }
 
-  def shutdown() {
+  def clear() {
 requestQueue.clear()
   }
 
+  def shutdown() {
+clear()
+metrics.close()
+  }
+
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/SocketServer.scala
--
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index bea8f79..4366fea 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -72,6 +72,7 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
 
   private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
   private var connectionQuotas: ConnectionQuotas = _
+  private var stoppedProcessingRequests = false
 
   /**
* Start the socket server
@@ -132,13 +133,28 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   requestChannel.addResponseListener(id => processors(id).wakeup())
 
   /**
-   * Shutdown the socket server
-   */
-  def shutdown() = {
-info("Shutting down")
+* Stop processing requests and new connections.
+*/
+  def stopProcessingRequests() = {
+info("Stopping socket server request processors")
 this.synchronized {
   acceptors.values.foreach(_.shutdown)
   processors.foreach(_.shutdown)
+  requestChannel.clear()
+  stoppedProcessingRequests = true
+}
+info("Stopped socket server request processors")
+  }
+
+  /**
+* Shutdown the socket server. If still processing requests, shutdown
+* acceptors and processors first.
+*/
+  def shutdown() = {
+info("Shutting down socket server")
+this.synchronized {
+  if (!stoppedProcessingRequests)
+stopProcessingRequests()
   requestChannel.shutdown()
 }
 info("Shutdown completed")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 3d8dbd9..a498781 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,7 +105,6 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   handler.initiateShutdown()
 for (handler <- runnables)
   handler.awaitShutdown()
-requestChannel.metrics.close()
 info("shut down completely")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaServer.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index a0732fd..a13f5af 100755
---