[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689957#comment-16689957
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---------------------------------------

gitlw closed pull request #5783: KAFKA-4453: Separating controller connections 
and requests from the data plane (KIP-291)
URL: https://github.com/apache/kafka/pull/5783
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecf6fbf33f1..29e10383afe 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -108,14 +108,16 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[QueueItem]
     debug(s"Controller ${config.brokerId} trying to connect to broker 
${broker.id}")
-    val brokerNode = broker.node(config.interBrokerListenerName)
+    val controlPlaneListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+    val controlPlaneSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+    val brokerNode = broker.node(controlPlaneListenerName)
     val logContext = new LogContext(s"[Controller id=${config.brokerId}, 
targetBrokerId=${brokerNode.idString}] ")
     val networkClient = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
-        config.interBrokerSecurityProtocol,
+        controlPlaneSecurityProtocol,
         JaasContext.Type.SERVER,
         config,
-        config.interBrokerListenerName,
+        controlPlaneListenerName,
         config.saslMechanismInterBrokerProtocol,
         config.saslInterBrokerHandshakeRequestEnable
       )
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 00b09688c5b..0bc8a2a6ff9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -40,6 +40,7 @@ object RequestChannel extends Logging {
   private val requestLogger = Logger("kafka.request.logger")
 
   val RequestQueueSizeMetric = "RequestQueueSize"
+  val ControlPlaneRequestQueueSizeMetric = "ControlPlaneRequestQueueSize"
   val ResponseQueueSizeMetric = "ResponseQueueSize"
   val ProcessorMetricTag = "processor"
 
@@ -272,13 +273,13 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int, val requestQueueSizeMetric: String) 
extends KafkaMetricsGroup {
   import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
 
-  newGauge(RequestQueueSizeMetric, new Gauge[Int] {
+  newGauge(requestQueueSizeMetric, new Gauge[Int] {
       def value = requestQueue.size
   })
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 1365f90f763..c3dac79f5f9 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -65,8 +65,14 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
-  val requestChannel = new RequestChannel(maxQueuedRequests)
-  private val processors = new ConcurrentHashMap[Int, Processor]()
+  val dataRequestChannel = new RequestChannel(maxQueuedRequests, 
RequestChannel.RequestQueueSizeMetric)
+  var controlPlaneRequestChannel: RequestChannel = null
+  if (config.controlPlaneListenerName.isDefined) {
+    controlPlaneRequestChannel = new RequestChannel(20, 
RequestChannel.ControlPlaneRequestQueueSizeMetric)
+  }
+  private val dataProcessors = new ConcurrentHashMap[Int, Processor]()
+  // there should be only one controller processor, however we use a map to 
store it so that we can reuse the logic for data processors
+  private[network] val controlPlaneProcessors = new ConcurrentHashMap[Int, 
Processor]()
   private var nextProcessorId = 0
 
   private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
@@ -88,25 +94,35 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   def startup(startupProcessors: Boolean = true) {
     this.synchronized {
       connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, 
config.maxConnectionsPerIpOverrides)
-      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
+      createAcceptorAndProcessors(config.numNetworkThreads, 
config.dataListeners, dataRequestChannel, dataProcessors, false)
+      if (config.controlPlaneListener.isDefined)
+        createAcceptorAndProcessors(1, Seq(config.controlPlaneListener.get), 
controlPlaneRequestChannel, controlPlaneProcessors, true)
+
       if (startupProcessors) {
         startProcessors()
       }
     }
 
-    newGauge("NetworkProcessorAvgIdlePercent",
-      new Gauge[Double] {
+    def createNetworkProcessorAvgIdlePercentMetric(metric: String, processors: 
java.util.Map[Int, Processor]): Unit = {
+      newGauge(metric,
+        new Gauge[Double] {
 
-        def value = SocketServer.this.synchronized {
-          val ioWaitRatioMetricNames = processors.values.asScala.map { p =>
-            metrics.metricName("io-wait-ratio", "socket-server-metrics", 
p.metricTags)
+          def value = SocketServer.this.synchronized {
+            val ioWaitRatioMetricNames = processors.values.asScala.map { p =>
+              metrics.metricName("io-wait-ratio", "socket-server-metrics", 
p.metricTags)
+            }
+            ioWaitRatioMetricNames.map { metricName =>
+              Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
+            }.sum / processors.size
           }
-          ioWaitRatioMetricNames.map { metricName =>
-            Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-          }.sum / processors.size
         }
-      }
-    )
+      )
+    }
+
+    
createNetworkProcessorAvgIdlePercentMetric("NetworkProcessorAvgIdlePercent", 
dataProcessors)
+    if (config.controlPlaneListener.isDefined)
+      
createNetworkProcessorAvgIdlePercentMetric("ControlPlaneNetworkProcessorIdlePercent",
 controlPlaneProcessors)
+
     newGauge("MemoryPoolAvailable",
       new Gauge[Long] {
         def value = memoryPool.availableMemory()
@@ -133,7 +149,10 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
   private def createAcceptorAndProcessors(processorsPerListener: Int,
-                                          endpoints: Seq[EndPoint]): Unit = 
synchronized {
+    endpoints: Seq[EndPoint],
+    requestChannel: RequestChannel,
+    processorCollector: java.util.Map[Int, Processor],
+    isControlPlane: Boolean): Unit = synchronized {
 
     val sendBufferSize = config.socketSendBufferBytes
     val recvBufferSize = config.socketReceiveBufferBytes
@@ -144,25 +163,27 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
       val securityProtocol = endpoint.securityProtocol
 
       val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, 
brokerId, connectionQuotas)
-      addProcessors(acceptor, endpoint, processorsPerListener)
-      
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}",
 acceptor).start()
+      addProcessors(acceptor, endpoint, processorsPerListener, requestChannel, 
processorCollector, isControlPlane)
+      KafkaThread.nonDaemon((if (isControlPlane) "control" else "data") + 
s"-plane-kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}",
 acceptor).start()
       acceptor.awaitStartup()
       acceptors.put(endpoint, acceptor)
     }
   }
 
-  private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, 
newProcessorsPerListener: Int): Unit = synchronized {
+  private def addProcessors(acceptor: Acceptor, endpoint: EndPoint,
+    newProcessorsPerListener: Int, requestChannel: RequestChannel, 
processorCollector: java.util.Map[Int, Processor],
+    isControlPlane: Boolean): Unit = synchronized {
     val listenerName = endpoint.listenerName
     val securityProtocol = endpoint.securityProtocol
     val listenerProcessors = new ArrayBuffer[Processor]()
 
     for (_ <- 0 until newProcessorsPerListener) {
-      val processor = newProcessor(nextProcessorId, connectionQuotas, 
listenerName, securityProtocol, memoryPool)
+      val processor = newProcessor(nextProcessorId, requestChannel, 
connectionQuotas, listenerName, securityProtocol, memoryPool, isControlPlane)
       listenerProcessors += processor
       requestChannel.addProcessor(processor)
       nextProcessorId += 1
     }
-    listenerProcessors.foreach(p => processors.put(p.id, p))
+    listenerProcessors.foreach(p => processorCollector.put(p.id, p))
     acceptor.addProcessors(listenerProcessors)
   }
 
@@ -173,8 +194,11 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
     info("Stopping socket server request processors")
     this.synchronized {
       acceptors.asScala.values.foreach(_.shutdown())
-      processors.asScala.values.foreach(_.shutdown())
-      requestChannel.clear()
+      dataProcessors.asScala.values.foreach(_.shutdown())
+      controlPlaneProcessors.asScala.values.foreach(_.shutdown())
+      dataRequestChannel.clear()
+      if (controlPlaneRequestChannel != null)
+        controlPlaneRequestChannel.clear()
       stoppedProcessingRequests = true
     }
     info("Stopped socket server request processors")
@@ -182,12 +206,18 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
 
   def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): 
Unit = synchronized {
     info(s"Resizing network thread pool size for each listener from 
$oldNumNetworkThreads to $newNumNetworkThreads")
+    val dataAcceptors = config.controlPlaneListenerName match {
+      case Some(controlPlaneListenerName) =>
+        acceptors.asScala.filter{ case (endpoint, _) => 
!endpoint.listenerName.value().equals(controlPlaneListenerName.value())}
+      case None => acceptors.asScala
+    }
+
     if (newNumNetworkThreads > oldNumNetworkThreads) {
-      acceptors.asScala.foreach { case (endpoint, acceptor) =>
-        addProcessors(acceptor, endpoint, newNumNetworkThreads - 
oldNumNetworkThreads)
+      dataAcceptors.foreach { case (endpoint, acceptor) =>
+        addProcessors(acceptor, endpoint, newNumNetworkThreads - 
oldNumNetworkThreads, dataRequestChannel, dataProcessors, false)
       }
     } else if (newNumNetworkThreads < oldNumNetworkThreads)
-      acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads 
- newNumNetworkThreads, requestChannel))
+      dataAcceptors.values.foreach(_.removeProcessors(oldNumNetworkThreads - 
newNumNetworkThreads, dataRequestChannel))
   }
 
   /**
@@ -199,7 +229,9 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
     this.synchronized {
       if (!stoppedProcessingRequests)
         stopProcessingRequests()
-      requestChannel.shutdown()
+      dataRequestChannel.shutdown()
+      if (controlPlaneRequestChannel != null )
+        controlPlaneRequestChannel.shutdown()
     }
     info("Shutdown completed")
   }
@@ -215,7 +247,12 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
 
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
     info(s"Adding listeners for endpoints $listenersAdded")
-    createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
+    val (controlPlaneListenersAdded, dataPlaneListenersAdded) = 
listenersAdded.partition { endpoint =>
+        config.controlPlaneListenerName.isDefined && 
endpoint.listenerName.value() == config.controlPlaneListenerName.get }
+    if (dataPlaneListenersAdded.nonEmpty)
+      createAcceptorAndProcessors(config.numNetworkThreads, 
dataPlaneListenersAdded, dataRequestChannel, dataProcessors, false)
+    if (controlPlaneListenersAdded.nonEmpty)
+      createAcceptorAndProcessors(1, controlPlaneListenersAdded, 
controlPlaneRequestChannel, controlPlaneProcessors, true)
     startProcessors()
   }
 
@@ -237,9 +274,10 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   }
 
   /* `protected` for test usage */
-  protected[network] def newProcessor(id: Int, connectionQuotas: 
ConnectionQuotas, listenerName: ListenerName,
-                                      securityProtocol: SecurityProtocol, 
memoryPool: MemoryPool): Processor = {
+  protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, 
connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+                                      securityProtocol: SecurityProtocol, 
memoryPool: MemoryPool, isControlPlane: Boolean): Processor = {
     new Processor(id,
+      isControlPlane,
       time,
       config.socketRequestMaxBytes,
       requestChannel,
@@ -261,7 +299,7 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
     Option(connectionQuotas).fold(0)(_.get(address))
 
   /* For test usage */
-  private[network] def processor(index: Int): Processor = processors.get(index)
+  private[network] def processor(index: Int): Processor = 
dataProcessors.get(index)
 
 }
 
@@ -355,7 +393,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   private def startProcessors(processors: Seq[Processor]): Unit = synchronized 
{
     processors.foreach { processor =>
-      
KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+      KafkaThread.nonDaemon((if (processor.isControlPlane) "control" else 
"data") + 
s"-plane-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
         processor).start()
     }
   }
@@ -498,6 +536,7 @@ private[kafka] object Processor {
  * each of which has its own selector
  */
 private[kafka] class Processor(val id: Int,
+                               val isControlPlane: Boolean,
                                time: Time,
                                maxRequestSize: Int,
                                requestChannel: RequestChannel,
@@ -528,6 +567,8 @@ private[kafka] class Processor(val id: Int,
     override def toString: String = 
s"$localHost:$localPort-$remoteHost:$remotePort-$index"
   }
 
+  // the receivesProcessed counter is defined only for testing
+  private[network] var receivesProcessed: Long = 0L
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, 
RequestChannel.Response]()
   private val responseQueue = new 
LinkedBlockingDeque[RequestChannel.Response]()
@@ -707,6 +748,7 @@ private[kafka] class Processor(val id: Int,
             val req = new RequestChannel.Request(processor = id, context = 
context,
               startTimeNanos = time.nanoseconds, memoryPool, receive.payload, 
requestChannel.metrics)
             requestChannel.sendRequest(req)
+            receivesProcessed += 1
             selector.mute(connectionId)
             handleChannelMuteEvent(connectionId, 
ChannelMuteEvent.REQUEST_RECEIVED)
           case None =>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2c0f6c1b52c..94325c6c7b0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -640,7 +640,7 @@ class DynamicThreadPool(server: KafkaServer) extends 
BrokerReconfigurable {
 
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     if (newConfig.numIoThreads != oldConfig.numIoThreads)
-      server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
+      server.dataRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
     if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
       server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, 
newConfig.numNetworkThreads)
     if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9edda4ea7f1..8ce42369472 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -282,6 +282,7 @@ object KafkaConfig {
   val AdvertisedPortProp = "advertised.port"
   val AdvertisedListenersProp = "advertised.listeners"
   val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
+  val ControlPlaneListenerNameProp = "control.plane.listener.name"
   val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
   val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
   val SocketRequestMaxBytesProp = "socket.request.max.bytes"
@@ -540,6 +541,27 @@ object KafkaConfig {
     "prefix (the listener name is lowercased) to the config name. For example, 
to set a different keystore for the " +
     "INTERNAL listener, a config with name 
<code>listener.name.internal.ssl.keystore.location</code> would be set. " +
     "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). "
+  val ControlPlaneListenerNameDoc = s"On the broker side, the 
$ControlPlaneListenerNameProp is used to locate an endpoint in the 
$ListenersProp list, which we will use to listen for connections " +
+    s"from the controller. For example, if a broker's config is:" +
+    """
+listeners=CONTROLLER://192.1.1.8:9091,INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093
+listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL
+control.plane.listener.name=CONTROLLER"""+
+    "\nUpon startup, a broker will bind to the local endpoint 192.1.1.8:9091 
with security protocol PLAINTEXT to listen for controller connections.\n" +
+    s"On the controller side, when the controller discovers a broker's 
published endpoints through Zookeeper, " +
+    s"the $ControlPlaneListenerNameProp is used to locate an endpoint within 
the published endpoints, which it will use to establish connections with "+
+    "the broker. For example, if a broker's published endpoints list on 
Zookeeper is" +
+    """
+"endpoints": [
+  "CONTROLLER://broker1.example.com:9091",
+  "INTERNAL://broker1.example.com:9092",
+  "EXTERNAL://host1.example.com:9093"
+]""" +
+    s"\n and the controller's config is\n" +
+    
"listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL\n"
 +
+    "control.plane.listener.name=CONTROLLER\n" +
+    "then the controller will use the endpoint broker1.example.com:9091 with 
security protocol PLAINTEXT to connect to the discovered broker.\n" +
+    "If not explicitly configured, the default value will be null and there 
will be no dedicated endpoints for controller connections."
 
   val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever 
sockets. If the value is -1, the OS default will be used."
   val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever 
sockets. If the value is -1, the OS default will be used."
@@ -837,6 +859,7 @@ object KafkaConfig {
       .define(AdvertisedHostNameProp, STRING, null, HIGH, 
AdvertisedHostNameDoc)
       .define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc)
       .define(AdvertisedListenersProp, STRING, null, HIGH, 
AdvertisedListenersDoc)
+      .define(ControlPlaneListenerNameProp, STRING, null, HIGH, 
ControlPlaneListenerNameDoc)
       .define(ListenerSecurityProtocolMapProp, STRING, 
Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc)
       .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, 
HIGH, SocketSendBufferBytesDoc)
       .define(SocketReceiveBufferBytesProp, INT, 
Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
@@ -1253,6 +1276,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 
   def interBrokerListenerName = 
getInterBrokerListenerNameAndSecurityProtocol._1
   def interBrokerSecurityProtocol = 
getInterBrokerListenerNameAndSecurityProtocol._2
+  val controlPlaneListenerName = 
getControlPlaneListenerNameAndSecurityProtocol.map(_._1)
+  val controlPlaneSecurityProtocol = 
getControlPlaneListenerNameAndSecurityProtocol.map(_._2)
   def saslMechanismInterBrokerProtocol = 
getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
   val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= 
KAFKA_0_10_0_IV1
 
@@ -1325,6 +1350,21 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + 
":" + port, listenerSecurityProtocolMap))
   }
 
+  // Return the endpoint corresponding to the controller listener name
+  def controlPlaneListener: Option[EndPoint] = {
+    controlPlaneListenerName.flatMap { name =>
+      listeners.filter(endpoint => endpoint.listenerName.value() == 
name.value()).headOption
+    }
+  }
+
+  // Return the endpoints that are not related to the controller listener name
+  def dataListeners: Seq[EndPoint] = {
+    Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
+      case Some(controllerListenerName) => 
listeners.filterNot(_.listenerName.value() == controllerListenerName)
+      case None => listeners
+    }
+  }
+
   // If the user defined advertised listeners, we use those
   // If he didn't but did define advertised host or port, we'll use those and 
fill in the missing value from regular host / port or defaults
   // If none of these are defined, we'll use the listeners
@@ -1356,6 +1396,18 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     }
   }
 
+  // If the controller.listener.name config is set, we use its value
+  // Otherwise, the controller listener name will be the same as the inter 
broker listener name
+  private def getControlPlaneListenerNameAndSecurityProtocol: 
Option[(ListenerName, SecurityProtocol)] = {
+    Option(getString(KafkaConfig.ControlPlaneListenerNameProp)).map { name =>
+      val controlPlaneListenerName = ListenerName.normalised(name)
+      val controlPlaneSecurityProtocol = 
listenerSecurityProtocolMap.getOrElse(controlPlaneListenerName,
+        throw new ConfigException(s"Listener with name 
${controlPlaneListenerName.value} defined in " +
+          s"${KafkaConfig.ControlPlaneListenerNameProp} not found in 
${KafkaConfig.ListenerSecurityProtocolMapProp}."))
+      (controlPlaneListenerName, controlPlaneSecurityProtocol)
+    }
+  }
+
   private def getSecurityProtocol(protocolName: String, configName: String): 
SecurityProtocol = {
     try SecurityProtocol.forName(protocolName)
     catch {
@@ -1412,6 +1464,24 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
       s"log.message.format.version $logMessageFormatVersionString can only be 
used when inter.broker.protocol.version " +
       s"is set to version 
${ApiVersion.minSupportedFor(recordVersion).shortVersion} or higher")
 
+    // validate the controller.listener.name config
+    if (controlPlaneListenerName.isDefined) {
+      
require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
+        s"${KafkaConfig.ControlPlaneListenerNameProp}, when defined, should 
have a different value from the inter broker listener name. " +
+          s"Currently they both have the value 
${controlPlaneListenerName.get}")
+
+      require(listenerNames.contains(controlPlaneListenerName.get),
+        s"${KafkaConfig.ControlPlaneListenerNameProp} has a value of 
${controlPlaneListenerName.get}, " +
+          s"which must be a listener name defined in 
${KafkaConfig.ListenersProp}. " +
+          s"The current listener names defined in ${KafkaConfig.ListenersProp} 
are ${listenerNames.map(_.value).mkString(",")}"
+      )
+      require(advertisedListenerNames.contains(controlPlaneListenerName.get),
+        s"${KafkaConfig.ControlPlaneListenerNameProp} has a value of 
${controlPlaneListenerName.get}, " +
+          s"which must be a listener name defined in 
${KafkaConfig.AdvertisedListenersProp}. " +
+          s"The current listener names defined in 
${KafkaConfig.AdvertisedListenersProp} are 
${advertisedListenerNames.map(_.value).mkString(",")}"
+      )
+    }
+
     val interBrokerUsesSasl = interBrokerSecurityProtocol == 
SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == 
SecurityProtocol.SASL_SSL
     require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || 
saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
       s"Only GSSAPI mechanism is supported for inter-broker communication with 
SASL when inter.broker.protocol.version is set to 
$interBrokerProtocolVersionString")
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d0d41219663..68dff3ff516 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -40,6 +40,8 @@ class KafkaRequestHandler(id: Int,
                           apis: KafkaApis,
                           time: Time) extends Runnable with Logging {
   this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + 
"], "
+  // the requestsHandled counter is defined only for testing
+  var requestsHandled: Long = 0L
   private val shutdownComplete = new CountDownLatch(1)
   @volatile private var stopped = false
 
@@ -67,6 +69,7 @@ class KafkaRequestHandler(id: Int,
             request.requestDequeueTimeNanos = endTime
             trace(s"Kafka request handler $id on broker $brokerId handling 
request $request")
             apis.handle(request)
+            requestsHandled += 1
           } catch {
             case e: FatalExitError =>
               shutdownComplete.countDown()
@@ -96,13 +99,15 @@ class KafkaRequestHandlerPool(val brokerId: Int,
                               val requestChannel: RequestChannel,
                               val apis: KafkaApis,
                               time: Time,
-                              numThreads: Int) extends Logging with 
KafkaMetricsGroup {
+                              numThreads: Int,
+                              requestHandlerAvgIdleMetric: String,
+                              isControlPlane: Boolean) extends Logging with 
KafkaMetricsGroup {
 
   private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
   /* a meter to track the average free capacity of the request handlers */
-  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", 
"percent", TimeUnit.NANOSECONDS)
+  private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetric, 
"percent", TimeUnit.NANOSECONDS)
 
-  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
+  this.logIdent = "[" + (if (isControlPlane) "Control" else "Data") + " Plane 
Kafka Request Handler on Broker " + brokerId + "], "
   val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
   for (i <- 0 until numThreads) {
     createHandler(i)
@@ -110,7 +115,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
 
   def createHandler(id: Int): Unit = synchronized {
     runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, 
threadPoolSize, requestChannel, apis, time)
-    KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
+    KafkaThread.daemon((if (isControlPlane) "control" else "data") + 
"-plane-kafka-request-handler-" + id, runnables(id)).start()
   }
 
   def resizeThreadPool(newSize: Int): Unit = synchronized {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d13d9e0730..364f1dffa20 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -113,10 +113,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   val brokerState: BrokerState = new BrokerState
 
-  var apis: KafkaApis = null
+  var dataPlaneApis: KafkaApis = null
+  var controlPlaneApis: KafkaApis = null
   var authorizer: Option[Authorizer] = None
   var socketServer: SocketServer = null
-  var requestHandlerPool: KafkaRequestHandlerPool = null
+  var dataRequestHandlerPool: KafkaRequestHandlerPool = null
+  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
 
   var logDirFailureChannel: LogDirFailureChannel = null
   var logManager: LogManager = null
@@ -291,12 +293,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
adminManager, groupCoordinator, transactionCoordinator,
+        dataPlaneApis = new KafkaApis(socketServer.dataRequestChannel, 
replicaManager, adminManager, groupCoordinator, transactionCoordinator,
           kafkaController, zkClient, config.brokerId, config, metadataCache, 
metrics, authorizer, quotaManagers,
           fetchManager, brokerTopicStats, clusterId, time, tokenManager)
 
-        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, time,
-          config.numIoThreads)
+        dataRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.dataRequestChannel, dataPlaneApis, time,
+          config.numIoThreads, "RequestHandlerAvgIdlePercent", false)
+
+        if (config.controlPlaneListenerName.isDefined) {
+          controlPlaneApis = new 
KafkaApis(socketServer.controlPlaneRequestChannel, replicaManager, 
adminManager, groupCoordinator, transactionCoordinator,
+            kafkaController, zkClient, config.brokerId, config, metadataCache, 
metrics, authorizer, quotaManagers,
+            fetchManager, brokerTopicStats, clusterId, time, tokenManager)
+
+          controlPlaneRequestHandlerPool = new 
KafkaRequestHandlerPool(config.brokerId, 
socketServer.controlPlaneRequestChannel,
+            controlPlaneApis, time, 1, 
"ControlPlaneRequestHandlerIdlePercent", true)
+        }
 
         Mx4jLoader.maybeLoad()
 
@@ -573,14 +584,18 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         // Socket server will be shutdown towards the end of the sequence.
         if (socketServer != null)
           CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
-        if (requestHandlerPool != null)
-          CoreUtils.swallow(requestHandlerPool.shutdown(), this)
+        if (dataRequestHandlerPool != null)
+          CoreUtils.swallow(dataRequestHandlerPool.shutdown(), this)
+        if (controlPlaneRequestHandlerPool != null)
+          CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
 
         if (kafkaScheduler != null)
           CoreUtils.swallow(kafkaScheduler.shutdown(), this)
 
-        if (apis != null)
-          CoreUtils.swallow(apis.close(), this)
+        if (dataPlaneApis != null)
+          CoreUtils.swallow(dataPlaneApis.close(), this)
+        if (controlPlaneApis != null)
+          CoreUtils.swallow(controlPlaneApis.close(), this)
         CoreUtils.swallow(authorizer.foreach(_.close()), this)
         if (adminManager != null)
           CoreUtils.swallow(adminManager.shutdown(), this)
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index ca23e1fd4c4..4bdfa70dafa 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -244,9 +244,9 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     client = AdminClient.create(createConfig())
     val nodes = client.describeCluster.nodes.get()
     val clusterId = client.describeCluster().clusterId().get()
-    assertEquals(servers.head.apis.clusterId, clusterId)
+    assertEquals(servers.head.dataPlaneApis.clusterId, clusterId)
     val controller = client.describeCluster().controller().get()
-    assertEquals(servers.head.apis.metadataCache.getControllerId.
+    assertEquals(servers.head.dataPlaneApis.metadataCache.getControllerId.
       getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
     val brokers = brokerList.split(",")
     assertEquals(brokers.size, nodes.size)
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ae6273623ae..9aada113b1d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -611,7 +611,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val producer = createProducer()
     sendRecords(producer, 1, tp)
     removeAllAcls()
-    
+
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
     consumeRecords(consumer)
@@ -1452,9 +1452,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   def removeAllAcls() = {
-    servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
-      servers.head.apis.authorizer.get.removeAcls(resource)
-      TestUtils.waitAndVerifyAcls(Set.empty[Acl], 
servers.head.apis.authorizer.get, resource)
+    servers.head.dataPlaneApis.authorizer.get.getAcls().keys.foreach { 
resource =>
+      servers.head.dataPlaneApis.authorizer.get.removeAcls(resource)
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], 
servers.head.dataPlaneApis.authorizer.get, resource)
     }
   }
 
@@ -1507,8 +1507,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def addAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
-    servers.head.apis.authorizer.get.addAcls(acls, resource)
-    
TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) 
++ acls, servers.head.apis.authorizer.get, resource)
+    servers.head.dataPlaneApis.authorizer.get.addAcls(acls, resource)
+    
TestUtils.waitAndVerifyAcls(servers.head.dataPlaneApis.authorizer.get.getAcls(resource)
 ++ acls, servers.head.dataPlaneApis.authorizer.get, resource)
   }
 
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 7ac3c66ebf3..a1bc27b3a68 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -285,7 +285,7 @@ abstract class QuotaTestClients(topic: String,
 
   def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, 
requestQuota: Double, server: KafkaServer = leaderNode) {
     TestUtils.retry(10000) {
-      val quotaManagers = server.apis.quotas
+      val quotaManagers = server.dataPlaneApis.quotas
       val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, 
producerClientId)
       val overrideConsumerQuota = quota(quotaManagers.fetch, userPrincipal, 
consumerClientId)
       val overrideProducerRequestQuota = quota(quotaManagers.request, 
userPrincipal, producerClientId)
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index c9da8347bb3..c1db456d9a2 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -179,8 +179,8 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   override def setUp() {
     super.setUp()
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, 
Resource.ClusterResource)
-      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, 
Resource(Topic, "*", LITERAL))
+      TestUtils.waitAndVerifyAcls(ClusterActionAcl, 
s.dataPlaneApis.authorizer.get, Resource.ClusterResource)
+      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, 
s.dataPlaneApis.authorizer.get, Resource(Topic, "*", LITERAL))
     }
     // create the test topic with all the brokers as replicas
     createTopic(topic, 1, 3)
@@ -247,16 +247,16 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   private def setWildcardResourceAcls() {
     AclCommand.main(produceConsumeWildcardAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, 
s.apis.authorizer.get, wildcardTopicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
wildcardGroupResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, 
s.dataPlaneApis.authorizer.get, wildcardTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, 
s.dataPlaneApis.authorizer.get, wildcardGroupResource)
     }
   }
 
   private def setPrefixedResourceAcls() {
     AclCommand.main(produceConsumePrefixedAclsArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, 
prefixedTopicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
prefixedGroupResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneApis.authorizer.get, 
prefixedTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, 
s.dataPlaneApis.authorizer.get, prefixedGroupResource)
     }
   }
 
@@ -264,9 +264,9 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(consumeAclArgs(tp.topic))
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get,
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneApis.authorizer.get,
         new Resource(Topic, tp.topic, PatternType.LITERAL))
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, 
s.dataPlaneApis.authorizer.get, groupResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
@@ -286,7 +286,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   def testNoProduceWithDescribeAcl(): Unit = {
     AclCommand.main(describeAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, 
topicResource)
+      TestUtils.waitAndVerifyAcls(TopicDescribeAcl, 
s.dataPlaneApis.authorizer.get, topicResource)
     }
     try{
       val producer = createProducer()
@@ -297,7 +297,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
         assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     }
   }
-  
+
    /**
     * Tests that a consumer fails to consume messages without the appropriate
     * ACL set.
@@ -310,7 +310,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     // the exception is expected when the consumer attempts to lookup offsets
     consumeRecords(consumer)
   }
-  
+
   @Test(expected = classOf[TopicAuthorizationException])
   def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
     noConsumeWithoutDescribeAclSetup()
@@ -319,13 +319,13 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     // this should timeout since the consumer will not be able to fetch any 
metadata for the topic
     consumeRecords(consumer, timeout = 3000)
   }
-  
+
   private def noConsumeWithoutDescribeAclSetup(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.apis.authorizer.get, topicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.dataPlaneApis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, 
s.dataPlaneApis.authorizer.get, groupResource)
     }
 
     val producer = createProducer()
@@ -334,10 +334,10 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     AclCommand.main(deleteDescribeAclArgs)
     AclCommand.main(deleteWriteAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, 
s.dataPlaneApis.authorizer.get, groupResource)
     }
   }
-  
+
   @Test
   def testNoConsumeWithDescribeAclViaAssign(): Unit = {
     noConsumeWithDescribeAclSetup()
@@ -352,7 +352,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
         assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     }
   }
-  
+
   @Test
   def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
     noConsumeWithDescribeAclSetup()
@@ -367,13 +367,13 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
         assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     }
   }
-  
+
   private def noConsumeWithDescribeAclSetup(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.apis.authorizer.get, topicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, 
groupResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.dataPlaneApis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, 
s.dataPlaneApis.authorizer.get, groupResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
@@ -387,7 +387,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   def testNoGroupAcl(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ 
TopicCreateAcl, s.dataPlaneApis.authorizer.get, topicResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 55e1529c963..4cbbb4798c8 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -71,7 +71,7 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
 
   private def addClusterAcl(permissionType: PermissionType, operation: 
Operation): Unit = {
     val acls = Set(clusterAcl(permissionType, operation))
-    val authorizer = servers.head.apis.authorizer.get
+    val authorizer = servers.head.dataPlaneApis.authorizer.get
     val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
     authorizer.addAcls(acls, AuthResource.ClusterResource)
     TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, 
AuthResource.ClusterResource)
@@ -79,7 +79,7 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
 
   private def removeClusterAcl(permissionType: PermissionType, operation: 
Operation): Unit = {
     val acls = Set(clusterAcl(permissionType, operation))
-    val authorizer = servers.head.apis.authorizer.get
+    val authorizer = servers.head.dataPlaneApis.authorizer.get
     val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
     Assert.assertTrue(authorizer.removeAcls(acls, 
AuthResource.ClusterResource))
     TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, 
AuthResource.ClusterResource)
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 787cc8d3a01..e786039c826 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -500,8 +500,8 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
   @Test
   def testThreadPoolResize(): Unit = {
-    val requestHandlerPrefix = "kafka-request-handler-"
-    val networkThreadPrefix = "kafka-network-thread-"
+    val requestHandlerPrefix = "data-plane-kafka-request-handler-"
+    val networkThreadPrefix = "data-plane-kafka-network-thread-"
     val fetcherThreadPrefix = "ReplicaFetcherThread-"
     // Executor threads and recovery threads are not verified since threads 
may not be running
     // For others, thread count should be configuredCount * threadMultiplier * 
numBrokers
@@ -576,7 +576,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
       "", mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, 
config.numNetworkThreads,
       networkThreadPrefix, mayReceiveDuplicates = true)
-    verifyThreads("kafka-socket-acceptor-", config.listeners.size)
+    verifyThreads("data-plane-kafka-socket-acceptor-", config.listeners.size)
 
     verifyProcessorMetrics()
     verifyMarkPartitionsForTruncation()
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index a1c317e4e2c..73df7883516 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -175,8 +175,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
     // Test that the existing clientId overrides are read
     val server = 
TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, 
zkConnect)))
     servers = Seq(server)
-    assertEquals(new Quota(1000, true), 
server.apis.quotas.produce.quota("ANONYMOUS", clientId))
-    assertEquals(new Quota(2000, true), 
server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(1000, true), 
server.dataPlaneApis.quotas.produce.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(2000, true), 
server.dataPlaneApis.quotas.fetch.quota("ANONYMOUS", clientId))
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index dc4076a27c7..152228e0bb7 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -323,10 +323,10 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     var activeServers = servers.filter(s => s.config.brokerId != 2)
     // wait for the update metadata request to trickle to the brokers
     TestUtils.waitUntilTrue(() =>
-      
activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size
 != 3),
+      
activeServers.forall(_.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size
 != 3),
       "Topic test not created after timeout")
     assertEquals(0, partitionsRemaining.size)
-    var partitionStateInfo = 
activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    var partitionStateInfo = 
activeServers.head.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get
     var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
     assertEquals(0, leaderAfterShutdown)
     assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
@@ -336,16 +336,16 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     partitionsRemaining = resultQueue.take().get
     assertEquals(0, partitionsRemaining.size)
     activeServers = servers.filter(s => s.config.brokerId == 0)
-    partitionStateInfo = 
activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    partitionStateInfo = 
activeServers.head.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get
     leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
     assertEquals(0, leaderAfterShutdown)
 
-    
assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader
 == 0))
+    
assertTrue(servers.forall(_.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader
 == 0))
     controller.controlledShutdown(0, controlledShutdownCallback)
     partitionsRemaining = resultQueue.take().get
     assertEquals(1, partitionsRemaining.size)
     // leader doesn't change since all the replicas are shut down
-    
assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader
 == 0))
+    
assertTrue(servers.forall(_.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader
 == 0))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/network/ControlPlaneTest.scala 
b/core/src/test/scala/unit/kafka/network/ControlPlaneTest.scala
new file mode 100644
index 00000000000..9f54701260d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/network/ControlPlaneTest.scala
@@ -0,0 +1,52 @@
+package kafka.network
+
+import java.util.Properties
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class ControlPlaneTest extends KafkaServerTestHarness {
+  val numNodes = 2
+  val overridingProps = new Properties()
+  val testedMetrics = List("ControlPlaneRequestQueueSize", 
"ControlPlaneNetworkProcessorIdlePercent", 
"ControlPlaneRequestHandlerIdlePercent")
+
+  overridingProps.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
+  overridingProps.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + 
TestUtils.RandomPort + ",CONTROLLER://localhost:" + TestUtils.RandomPort)
+  overridingProps.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
+
+  /**
+    * Implementations must override this method to return a set of 
KafkaConfigs. This method will be invoked for every
+    * test and should not reuse previous configurations unless they select 
their ports randomly when servers are started.
+    */
+  override def generateConfigs: Seq[KafkaConfig] =
+    TestUtils.createBrokerConfigs(numNodes, zkConnect, 
enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps))
+
+  @Test
+  def testCreatingTopicThroughControlPlane() {
+    val topic = "test"
+    createTopic(topic, 3, numNodes)
+
+    for (s <- servers) {
+      val controlPlaneProcessors = 
s.socketServer.controlPlaneProcessors.asScala
+      assertTrue(s"There should be exactly one control plane network processor 
thread when ${KafkaConfig.ControlPlaneListenerNameProp} is set",
+        controlPlaneProcessors.size == 1)
+      controlPlaneProcessors.foreach {
+        case (id, processor) =>
+          assertTrue("The control plane network processor should have 
processed some request", processor.receivesProcessed > 0)
+      }
+
+      assertTrue(s"There should be exactly one control plane request handler 
thread when ${KafkaConfig.ControlPlaneListenerNameProp} is set",
+        s.controlPlaneRequestHandlerPool != null && 
s.controlPlaneRequestHandlerPool.runnables.size == 1)
+      s.controlPlaneRequestHandlerPool.runnables.foreach { requestHandler =>
+        assertTrue("The control plane request handler thread should have 
processed some request", requestHandler.requestsHandled > 0)
+      }
+    }
+
+    testedMetrics.foreach { metric => TestUtils.verifyMetricExistence(metric, 
true)}
+  }
+}
diff --git 
a/core/src/test/scala/unit/kafka/network/InactiveControlPlaneTest.scala 
b/core/src/test/scala/unit/kafka/network/InactiveControlPlaneTest.scala
new file mode 100644
index 00000000000..da0f0cebc58
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/network/InactiveControlPlaneTest.scala
@@ -0,0 +1,46 @@
+package kafka.network
+
+import com.yammer.metrics.{Metrics => YammerMetrics}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+class InactiveControlPlaneTest extends KafkaServerTestHarness {
+  val numNodes = 2
+  val testedMetrics = List("ControlPlaneRequestQueueSize", 
"ControlPlaneNetworkProcessorIdlePercent", 
"ControlPlaneRequestHandlerIdlePercent")
+
+  /**
+    * Implementations must override this method to return a set of 
KafkaConfigs. This method will be invoked for every
+    * test and should not reuse previous configurations unless they select 
their ports randomly when servers are started.
+    */
+  override def generateConfigs: Seq[KafkaConfig] =
+    TestUtils.createBrokerConfigs(numNodes, zkConnect, 
enableDeleteTopic=true).map(KafkaConfig.fromProps(_))
+
+  @Before
+  override def setUp(): Unit = {
+    TestUtils.cleanMetricsRegistry()
+    super.setUp()
+  }
+
+  /**
+    * the control plane metrics should not exist when the 
control.plane.listener.name is not set
+    */
+  @Test
+  def testInactiveControlPlane(): Unit = {
+    for (s <- servers) {
+      val controlPlaneProcessors = 
s.socketServer.controlPlaneProcessors.asScala
+      assertTrue(s"There should be no control plane network processor thread 
when ${KafkaConfig.ControlPlaneListenerNameProp} is not set",
+        controlPlaneProcessors.isEmpty)
+
+      assertTrue(s"There should be no control plane request handler thread 
when ${KafkaConfig.ControlPlaneListenerNameProp} is not set",
+        s.controlPlaneRequestHandlerPool == null)
+    }
+
+    testedMetrics.foreach { metric => TestUtils.verifyMetricExistence(metric, 
false)}
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b5983377b77..b1458f2ddb2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -144,13 +144,13 @@ class SocketServerTest extends JUnitSuite {
   def connectAndProcessRequest(s: SocketServer): (Socket, String) = {
     val socket = connect(s)
     val request = sendAndReceiveRequest(socket, s)
-    processRequest(s.requestChannel, request)
+    processRequest(s.dataRequestChannel, request)
     (socket, request.context.connectionId)
   }
 
   def sendAndReceiveRequest(socket: Socket, server: SocketServer): 
RequestChannel.Request = {
     sendRequest(socket, producerRequestBytes())
-    receiveRequest(server.requestChannel)
+    receiveRequest(server.dataRequestChannel)
   }
 
   def shutdownServerAndMetrics(server: SocketServer): Unit = {
@@ -181,7 +181,7 @@ class SocketServerTest extends JUnitSuite {
 
     // Test PLAINTEXT socket
     sendRequest(plainSocket, serializedBytes)
-    processRequest(server.requestChannel)
+    processRequest(server.dataRequestChannel)
     assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
   }
 
@@ -212,9 +212,9 @@ class SocketServerTest extends JUnitSuite {
       sendRequest(plainSocket, serializedBytes)
     plainSocket.close()
     for (_ <- 0 until 10) {
-      val request = receiveRequest(server.requestChannel)
+      val request = receiveRequest(server.dataRequestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new 
RequestChannel.NoOpResponse(request))
+      server.dataRequestChannel.sendResponse(new 
RequestChannel.NoOpResponse(request))
     }
   }
 
@@ -226,9 +226,9 @@ class SocketServerTest extends JUnitSuite {
     for (_ <- 0 until 3)
       sendRequest(plainSocket, serializedBytes)
     for (_ <- 0 until 3) {
-      val request = receiveRequest(server.requestChannel)
+      val request = receiveRequest(server.dataRequestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new 
RequestChannel.NoOpResponse(request))
+      server.dataRequestChannel.sendResponse(new 
RequestChannel.NoOpResponse(request))
     }
   }
 
@@ -239,7 +239,7 @@ class SocketServerTest extends JUnitSuite {
 
     val requests = sockets.map{socket =>
       sendRequest(socket, serializedBytes)
-      receiveRequest(server.requestChannel)
+      receiveRequest(server.dataRequestChannel)
     }
     requests.zipWithIndex.foreach { case (request, i) =>
       val index = request.context.connectionId.split("-").last
@@ -269,14 +269,14 @@ class SocketServerTest extends JUnitSuite {
       // Connection with no staged receives
       val socket1 = connect(overrideServer, protocol = 
SecurityProtocol.PLAINTEXT)
       sendRequest(socket1, serializedBytes)
-      val request1 = receiveRequest(overrideServer.requestChannel)
+      val request1 = receiveRequest(overrideServer.dataRequestChannel)
       assertTrue("Channel not open", openChannel(request1).nonEmpty)
       assertEquals(openChannel(request1), openOrClosingChannel(request1))
 
       time.sleep(idleTimeMs + 1)
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request1).isEmpty, 
"Failed to close idle channel")
       assertTrue("Channel not removed", openChannel(request1).isEmpty)
-      processRequest(overrideServer.requestChannel, request1)
+      processRequest(overrideServer.dataRequestChannel, request1)
 
       // Connection with staged receives
       val socket2 = connect(overrideServer, protocol = 
SecurityProtocol.PLAINTEXT)
@@ -285,9 +285,9 @@ class SocketServerTest extends JUnitSuite {
       time.sleep(idleTimeMs + 1)
       TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to 
close idle channel")
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).nonEmpty, 
"Channel removed without processing staged receives")
-      processRequest(overrideServer.requestChannel, request2) // this triggers 
a failed send since channel has been closed
+      processRequest(overrideServer.dataRequestChannel, request2) // this 
triggers a failed send since channel has been closed
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, 
"Failed to remove channel with failed sends")
-      assertNull("Received request after failed send", 
overrideServer.requestChannel.receiveRequest(200))
+      assertNull("Received request after failed send", 
overrideServer.dataRequestChannel.receiveRequest(200))
 
     } finally {
       shutdownServerAndMetrics(overrideServer)
@@ -304,9 +304,9 @@ class SocketServerTest extends JUnitSuite {
     @volatile var selector: TestableSelector = null
     val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), 
serverMetrics, time, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: 
MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, 
connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+                                protocol: SecurityProtocol, memoryPool: 
MemoryPool, isControlPlane: Boolean): Processor = {
+        new Processor(id, isControlPlane, time, config.socketRequestMaxBytes, 
dataRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, 
listenerName, protocol, config, metrics,
           credentialProvider, memoryPool, new LogContext()) {
             override protected[network] def connectionId(socket: Socket): 
String = overrideConnectionId
@@ -361,7 +361,7 @@ class SocketServerTest extends JUnitSuite {
       assertSame(channel1, openOrClosingChannel.getOrElse(throw new 
RuntimeException("Channel not found")))
 
       // Complete request with failed send so that `channel1` is removed from 
Selector.closingChannels
-      processRequest(overrideServer.requestChannel, request)
+      processRequest(overrideServer.dataRequestChannel, request)
       TestUtils.waitUntilTrue(() => connectionCount == 0 && 
openOrClosingChannel.isEmpty, "Failed to remove channel with failed send")
 
       // Check that new connections can be created with the same id since 
`channel1` is no longer in Selector
@@ -380,14 +380,14 @@ class SocketServerTest extends JUnitSuite {
     def sendTwoRequestsReceiveOne(): RequestChannel.Request = {
       sendRequest(socket, requestBytes, flush = false)
       sendRequest(socket, requestBytes, flush = true)
-      receiveRequest(server.requestChannel)
+      receiveRequest(server.dataRequestChannel)
     }
     val (request, hasStagedReceives) = 
TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req =>
       val connectionId = req.context.connectionId
       val hasStagedReceives = 
server.processor(0).numStagedReceives(connectionId) > 0
       if (!hasStagedReceives) {
-        processRequest(server.requestChannel, req)
-        processRequest(server.requestChannel)
+        processRequest(server.dataRequestChannel, req)
+        processRequest(server.dataRequestChannel)
       }
       hasStagedReceives
     }
@@ -403,11 +403,11 @@ class SocketServerTest extends JUnitSuite {
 
     // Mimic a primitive request handler that fetches the request from 
RequestChannel and place a response with a
     // throttled channel.
-    val request = receiveRequest(server.requestChannel)
+    val request = receiveRequest(server.dataRequestChannel)
     val byteBuffer = request.body[AbstractRequest].serialize(request.header)
     val send = new NetworkSend(request.context.connectionId, byteBuffer)
     def channelThrottlingCallback(response: RequestChannel.Response): Unit = {
-      server.requestChannel.sendResponse(response)
+      server.dataRequestChannel.sendResponse(response)
     }
     val throttledChannel = new ThrottledChannel(request, new MockTime(), 100, 
channelThrottlingCallback)
     val response =
@@ -415,7 +415,7 @@ class SocketServerTest extends JUnitSuite {
         new RequestChannel.SendResponse(request, send, 
Some(request.header.toString), None)
       else
         new RequestChannel.NoOpResponse(request)
-    server.requestChannel.sendResponse(response)
+    server.dataRequestChannel.sendResponse(response)
 
     // Quota manager would call notifyThrottlingDone() on throttling 
completion. Simulate it if throttleingInProgress is
     // false.
@@ -490,7 +490,7 @@ class SocketServerTest extends JUnitSuite {
     val bytes = new Array[Byte](40)
     // send a request first to make sure the connection has been picked up by 
the socket server
     sendRequest(plainSocket, bytes, Some(0))
-    processRequest(server.requestChannel)
+    processRequest(server.dataRequestChannel)
     // the following sleep is necessary to reliably detect the connection 
close when we send data below
     Thread.sleep(200L)
     // make sure the sockets are open
@@ -527,7 +527,7 @@ class SocketServerTest extends JUnitSuite {
     val conn2 = connect()
     val serializedBytes = producerRequestBytes()
     sendRequest(conn2, serializedBytes)
-    val request = server.requestChannel.receiveRequest(2000)
+    val request = server.dataRequestChannel.receiveRequest(2000)
     assertNotNull(request)
   }
 
@@ -555,7 +555,7 @@ class SocketServerTest extends JUnitSuite {
       val conn2 = connect(server)
       val serializedBytes = producerRequestBytes()
       sendRequest(conn2, serializedBytes)
-      val request = server.requestChannel.receiveRequest(2000)
+      val request = server.dataRequestChannel.receiveRequest(2000)
       assertNotNull(request)
 
       // now try to connect from the external facing interface, which should 
fail
@@ -583,7 +583,7 @@ class SocketServerTest extends JUnitSuite {
       // it should succeed
       val serializedBytes = producerRequestBytes()
       sendRequest(conns.last, serializedBytes)
-      val request = overrideServer.requestChannel.receiveRequest(2000)
+      val request = overrideServer.dataRequestChannel.receiveRequest(2000)
       assertNotNull(request)
 
       // now try one more (should fail)
@@ -627,7 +627,7 @@ class SocketServerTest extends JUnitSuite {
       byteBuffer.get(serializedBytes)
 
       sendRequest(sslSocket, serializedBytes)
-      processRequest(overrideServer.requestChannel)
+      processRequest(overrideServer.dataRequestChannel)
       assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
       sslSocket.close()
     } finally {
@@ -640,7 +640,7 @@ class SocketServerTest extends JUnitSuite {
     val socket = connect()
     val bytes = new Array[Byte](40)
     sendRequest(socket, bytes, Some(0))
-    assertEquals(KafkaPrincipal.ANONYMOUS, 
receiveRequest(server.requestChannel).session.principal)
+    assertEquals(KafkaPrincipal.ANONYMOUS, 
receiveRequest(server.dataRequestChannel).session.principal)
   }
 
   /* Test that we update request metrics if the client closes the connection 
while the broker response is in flight. */
@@ -650,9 +650,9 @@ class SocketServerTest extends JUnitSuite {
     val serverMetrics = new Metrics
     var conn: Socket = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), 
serverMetrics, Time.SYSTEM, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: 
MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, 
connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+                                protocol: SecurityProtocol, memoryPool: 
MemoryPool, isControlPlane: Boolean): Processor = {
+        new Processor(id, isControlPlane, time, config.socketRequestMaxBytes, 
dataRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, 
listenerName, protocol, config, metrics,
           credentialProvider, MemoryPool.NONE, new LogContext()) {
           override protected[network] def sendResponse(response: 
RequestChannel.Response, responseSend: Send) {
@@ -668,7 +668,7 @@ class SocketServerTest extends JUnitSuite {
       val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
 
-      val channel = overrideServer.requestChannel
+      val channel = overrideServer.dataRequestChannel
       val request = receiveRequest(channel)
 
       val requestMetrics = channel.metrics(request.header.apiKey.name)
@@ -696,9 +696,9 @@ class SocketServerTest extends JUnitSuite {
     @volatile var selector: TestableSelector = null
     val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), 
serverMetrics, Time.SYSTEM, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: 
MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, 
connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+                                protocol: SecurityProtocol, memoryPool: 
MemoryPool, isControlPlane: Boolean): Processor = {
+        new Processor(id, isControlPlane, time, config.socketRequestMaxBytes, 
dataRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, 
listenerName, protocol, config, metrics,
           credentialProvider, memoryPool, new LogContext()) {
           override protected[network] def connectionId(socket: Socket): String 
= overrideConnectionId
@@ -730,7 +730,7 @@ class SocketServerTest extends JUnitSuite {
       socket.close()
 
       // Complete request with socket exception so that the channel is removed 
from Selector.closingChannels
-      processRequest(overrideServer.requestChannel, request)
+      processRequest(overrideServer.dataRequestChannel, request)
       TestUtils.waitUntilTrue(() => openOrClosingChannel.isEmpty, "Channel not 
closed after failed send")
       assertTrue("Unexpected completed send", selector.completedSends.isEmpty)
     } finally {
@@ -755,7 +755,7 @@ class SocketServerTest extends JUnitSuite {
       conn = connect(overrideServer)
       val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
-      val channel = overrideServer.requestChannel
+      val channel = overrideServer.dataRequestChannel
       val request = receiveRequest(channel)
 
       TestUtils.waitUntilTrue(() => 
overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
@@ -780,10 +780,10 @@ class SocketServerTest extends JUnitSuite {
     server.stopProcessingRequests()
     val version = ApiKeys.PRODUCE.latestVersion
     val version2 = (version - 1).toShort
-    for (_ <- 0 to 1) 
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
-    
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
-    assertEquals(2, 
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
-    server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE 
-> 1))
+    for (_ <- 0 to 1) 
server.dataRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
+    
server.dataRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
+    assertEquals(2, 
server.dataRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
+    server.dataRequestChannel.updateErrorMetrics(ApiKeys.PRODUCE, 
Map(Errors.NONE -> 1))
     val nonZeroMeters = 
Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version"
 -> 2,
         
s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2"
 -> 1,
         
"kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE"
 -> 1)
@@ -879,7 +879,7 @@ class SocketServerTest extends JUnitSuite {
       sockets.foreach(sendRequest(_, producerRequestBytes()))
 
       testableServer.testableSelector.addFailure(SelectorOperation.Send)
-      sockets.foreach(_ => processRequest(testableServer.requestChannel))
+      sockets.foreach(_ => processRequest(testableServer.dataRequestChannel))
       testableSelector.waitForOperations(SelectorOperation.Send, 2)
       
testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, 
locallyClosed = true)
 
@@ -900,7 +900,7 @@ class SocketServerTest extends JUnitSuite {
 
       val sockets = (1 to 2).map(_ => connect(testableServer))
       sockets.foreach(sendRequest(_, producerRequestBytes()))
-      val requestChannel = testableServer.requestChannel
+      val requestChannel = testableServer.dataRequestChannel
 
       val requests = sockets.map(_ => receiveRequest(requestChannel))
       val failedConnectionId = requests(0).context.connectionId
@@ -933,8 +933,8 @@ class SocketServerTest extends JUnitSuite {
 
       testableSelector.addFailure(SelectorOperation.Send)
       sockets(0).close()
-      processRequest(testableServer.requestChannel, request)
-      processRequest(testableServer.requestChannel) // Also process request 
from other channel
+      processRequest(testableServer.dataRequestChannel, request)
+      processRequest(testableServer.dataRequestChannel) // Also process 
request from other channel
       testableSelector.waitForOperations(SelectorOperation.Send, 2)
       testableServer.waitForChannelClose(request.context.connectionId, 
locallyClosed = true)
 
@@ -957,7 +957,7 @@ class SocketServerTest extends JUnitSuite {
     withTestableServer { testableServer =>
       val sockets = (1 to 2).map(_ => connect(testableServer))
       val testableSelector = testableServer.testableSelector
-      val requestChannel = testableServer.requestChannel
+      val requestChannel = testableServer.dataRequestChannel
 
       testableSelector.cachedCompletedReceives.minPerPoll = 2
       testableSelector.addFailure(SelectorOperation.Mute)
@@ -989,7 +989,7 @@ class SocketServerTest extends JUnitSuite {
       val requests = sockets.map(sendAndReceiveRequest(_, testableServer))
 
       testableSelector.addFailure(SelectorOperation.Unmute)
-      requests.foreach(processRequest(testableServer.requestChannel, _))
+      requests.foreach(processRequest(testableServer.dataRequestChannel, _))
       testableSelector.waitForOperations(SelectorOperation.Unmute, 2)
       
testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, 
locallyClosed = true)
 
@@ -1073,7 +1073,7 @@ class SocketServerTest extends JUnitSuite {
   private def assertProcessorHealthy(testableServer: TestableSocketServer, 
healthySockets: Seq[Socket] = Seq.empty): Unit = {
     val selector = testableServer.testableSelector
     selector.reset()
-    val requestChannel = testableServer.requestChannel
+    val requestChannel = testableServer.dataRequestChannel
 
     // Check that existing channels behave as expected
     healthySockets.foreach { socket =>
@@ -1101,9 +1101,9 @@ class SocketServerTest extends JUnitSuite {
 
     @volatile var selector: Option[TestableSelector] = None
 
-    override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: 
MemoryPool): Processor = {
-      new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas, config.connectionsMaxIdleMs,
+    override def newProcessor(id: Int, requestChannel: 
RequestChannel,connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+                                protocol: SecurityProtocol, memoryPool: 
MemoryPool, isControlPlane: Boolean): Processor = {
+      new Processor(id, isControlPlane, time, config.socketRequestMaxBytes, 
dataRequestChannel, connectionQuotas, config.connectionsMaxIdleMs,
         config.failedAuthenticationDelayMs, listenerName, protocol, config, 
metrics, credentialProvider, memoryPool, new LogContext()) {
         override protected[network] def createSelector(channelBuilder: 
ChannelBuilder): Selector = {
            val testableSelector = new TestableSelector(config, channelBuilder, 
time, metrics)
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cabe0a984e2..9182cfc6a4e 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -93,7 +93,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000")
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000")
 
-    val quotaManagers = servers.head.apis.quotas
+    val quotaManagers = servers.head.dataPlaneApis.quotas
     rootEntityType match {
       case ConfigType.Client => 
adminZkClient.changeClientIdConfig(configEntityName, props)
       case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, 
props)
@@ -179,7 +179,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     // Remove config change znodes to force quota initialization only through 
loading of user/client quotas
     zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p 
=> zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) }
     server.startup()
-    val quotaManagers = server.apis.quotas
+    val quotaManagers = server.dataPlaneApis.quotas
 
     assertEquals(Quota.upperBound(1000),  
quotaManagers.produce.quota("someuser", "overriddenClientId"))
     assertEquals(Quota.upperBound(2000),  
quotaManagers.fetch.quota("someuser", "overriddenClientId"))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b75c3e7eb24..f268ff8f87b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -230,6 +230,40 @@ class KafkaConfigTest {
     assertFalse(isValidKafkaConfig(props))
   }
 
+  @Test
+  def testControlPlaneListenerName() = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9092,CONTROLLER://localhost:9091")
+    props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    assertTrue(isValidKafkaConfig(props))
+
+    // verify that there must be an entry for control.plane.listener.name in 
the listener.security.protocol.map
+    val wrongPropsWithoutSecurityProtocol = new Properties()
+    wrongPropsWithoutSecurityProtocol.putAll(props)
+    
wrongPropsWithoutSecurityProtocol.put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "PLAINTEXT:PLAINTEXT")
+    assertFalse(isValidKafkaConfig(wrongPropsWithoutSecurityProtocol))
+
+    // verify that the control.plane.listener.name must be present in the 
listeners list
+    val propsWithNoControlPlaneEndpointInListeners = new Properties()
+    propsWithNoControlPlaneEndpointInListeners.putAll(props)
+    propsWithNoControlPlaneEndpointInListeners.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9092")
+    assertFalse(isValidKafkaConfig(propsWithNoControlPlaneEndpointInListeners))
+
+    // verify that the control.plane.listener.name must be present in the 
advertised.listeners list
+    val propsWithNoControlPlaneEndpointInAdvertisedListeners = new Properties()
+    propsWithNoControlPlaneEndpointInAdvertisedListeners.putAll(props)
+    
propsWithNoControlPlaneEndpointInAdvertisedListeners.put(KafkaConfig.AdvertisedHostNameProp,
+      "PLAINTEXT://localhost:9092")
+    
assertFalse(isValidKafkaConfig(propsWithNoControlPlaneEndpointInAdvertisedListeners))
+
+    // verify that the control.plane.listener.name must be different from the 
inter broker listener name
+    val propsWithIdenticalControlPlaneAndDataPlaneListenerNames = new 
Properties()
+    propsWithIdenticalControlPlaneAndDataPlaneListenerNames.putAll(props)
+    
propsWithIdenticalControlPlaneAndDataPlaneListenerNames.put(KafkaConfig.ControlPlaneListenerNameProp,
 "PLAINTEXT")
+    
assertFalse(isValidKafkaConfig(propsWithIdenticalControlPlaneAndDataPlaneListenerNames))
+  }
+
   @Test
   def testBadListenerProtocol() {
     val props = new Properties()
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 740b28e11cf..255256ad14c 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 4e60a8f6027..1610404c246 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -74,7 +74,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertNotEquals("Controller id should switch to a new broker", 
controllerId, controllerId2)
     TestUtils.waitUntilTrue(() => {
       val metadataResponse2 = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
-      metadataResponse2.controller != null && controllerServer2.apis.brokerId 
== metadataResponse2.controller.id
+      metadataResponse2.controller != null && 
controllerServer2.dataPlaneApis.brokerId == metadataResponse2.controller.id
     }, "Controller id should match the active controller after failover", 5000)
   }
 
@@ -178,7 +178,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(topic1, topicMetadata1.topic)
     assertEquals(Errors.INVALID_TOPIC_EXCEPTION, topicMetadata2.error)
     assertEquals(topic2, topicMetadata2.topic)
-    
+
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0)
 
@@ -250,7 +250,7 @@ class MetadataRequestTest extends BaseRequestTest {
     val metadataResponse = sendMetadataRequest(new 
MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
     val partitionMetadata = 
metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     val downNode = servers.find { server =>
-      val serverId = server.apis.brokerId
+      val serverId = server.dataPlaneApis.brokerId
       val leaderId = partitionMetadata.leader.id
       val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
       serverId != leaderId && replicaIds.contains(serverId)
@@ -260,7 +260,7 @@ class MetadataRequestTest extends BaseRequestTest {
     TestUtils.waitUntilTrue(() => {
       val response = sendMetadataRequest(new 
MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
       val metadata = 
response.topicMetadata.asScala.head.partitionMetadata.asScala.head
-      val replica = metadata.replicas.asScala.find(_.id == 
downNode.apis.brokerId).get
+      val replica = metadata.replicas.asScala.find(_.id == 
downNode.dataPlaneApis.brokerId).get
       replica.host == "" & replica.port == -1
     }, "Replica was not found down", 5000)
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3604385f6ad..d0085400699 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -100,7 +100,7 @@ class RequestQuotaTest extends BaseRequestTest {
     
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId),
 quotaProps)
 
     TestUtils.retry(10000) {
-      val quotaManager = servers.head.apis.quotas.request
+      val quotaManager = servers.head.dataPlaneApis.quotas.request
       assertEquals(s"Default request quota not set", Quota.upperBound(0.01), 
quotaManager.quota("some-user", "some-client"))
       assertEquals(s"Request quota override not set", Quota.upperBound(2000), 
quotaManager.quota("some-user", unthrottledClientId))
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 99742307b26..bc95edb1cb1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -35,6 +35,7 @@ import kafka.security.auth.{Acl, Authorizer, Resource}
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import Implicits._
+import com.yammer.metrics.Metrics
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk._
 import org.apache.kafka.clients.CommonClientConfigs
@@ -811,7 +812,7 @@ object TestUtils extends Logging {
                                           timeout: Long = 
JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
     val expectedBrokerIds = servers.map(_.config.brokerId).toSet
     TestUtils.waitUntilTrue(() => servers.forall(server =>
-      expectedBrokerIds == 
server.apis.metadataCache.getAliveBrokers.map(_.id).toSet
+      expectedBrokerIds == 
server.dataPlaneApis.metadataCache.getAliveBrokers.map(_.id).toSet
     ), "Timed out waiting for broker metadata to propagate to all servers", 
timeout)
   }
 
@@ -831,7 +832,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(() =>
       servers.foldLeft(true) {
         (result, server) =>
-          val partitionStateOpt = 
server.apis.metadataCache.getPartitionInfo(topic, partition)
+          val partitionStateOpt = 
server.dataPlaneApis.metadataCache.getPartitionInfo(topic, partition)
           partitionStateOpt match {
             case None => false
             case Some(partitionState) =>
@@ -1411,4 +1412,21 @@ object TestUtils extends Logging {
       .foldLeft(0.0)((total, metric) => total + 
metric.metricValue.asInstanceOf[Double])
     total.toLong
   }
+
+  def verifyMetricExistence(metricName: String, shouldExist: Boolean): Unit = {
+    val metrics = Metrics.defaultRegistry
+    val foundMetrics = metrics.allMetrics.asScala.filter {
+      k => k._1.getName.equals(metricName)
+    }
+
+    val metricExists = foundMetrics.nonEmpty
+
+    assertTrue(s"The $metricName metric should " + (if (!shouldExist) "not " 
else "") + "exist.",
+      if (shouldExist) metricExists else !metricExists )
+  }
+
+  def cleanMetricsRegistry() {
+    val metrics = Metrics.defaultRegistry
+    metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 39745e5e608..061a4bd685b 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -310,8 +310,8 @@ class AdminZkClientTest extends ZooKeeperTestHarness with 
Logging with RackAware
     // Test that the existing clientId overrides are read
     val server = 
TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, 
zkConnect)))
     servers = Seq(server)
-    assertEquals(new Quota(1000, true), 
server.apis.quotas.produce.quota("ANONYMOUS", clientId))
-    assertEquals(new Quota(2000, true), 
server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(1000, true), 
server.dataPlaneApis.quotas.produce.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(2000, true), 
server.dataPlaneApis.quotas.fetch.quota("ANONYMOUS", clientId))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 0088c657de0..26fe6828778 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{ArrayBlockingQueue, 
ConcurrentLinkedQueue, CountDow
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, Meter, MetricName}
+import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Time
@@ -43,7 +44,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Before
   override def setUp() {
-    cleanMetricsRegistry()
+    TestUtils.cleanMetricsRegistry()
     super.setUp()
     zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,
       Time.SYSTEM, "testMetricGroup", "testMetricType")
@@ -637,10 +638,5 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     assertEquals(States.CLOSED, zooKeeperClient.connectionState)
   }
 
-  private def cleanMetricsRegistry() {
-    val metrics = Metrics.defaultRegistry
-    metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
-  }
-
   private def bytes = 
UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 8bca79f8d83..a9461fe99e6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -518,7 +518,7 @@ public static void waitUntilMetadataIsPropagated(final 
List<KafkaServer> servers
                                                      final long timeout) 
throws InterruptedException {
         TestUtils.waitForCondition(() -> {
             for (final KafkaServer server : servers) {
-                final MetadataCache metadataCache = 
server.apis().metadataCache();
+                final MetadataCache metadataCache = 
server.dataPlaneApis().metadataCache();
                 final Option<UpdateMetadataRequest.PartitionState> 
partitionInfo =
                         metadataCache.getPartitionInfo(topic, partition);
                 if (partitionInfo.isEmpty()) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add request prioritization
> --------------------------
>
>                 Key: KAFKA-4453
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4453
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Onur Karaman
>            Assignee: Mayuresh Gharat
>            Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to