[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689957#comment-16689957 ]
ASF GitHub Bot commented on KAFKA-4453: --------------------------------------- gitlw closed pull request #5783: KAFKA-4453: Separating controller connections and requests from the data plane (KIP-291) URL: https://github.com/apache/kafka/pull/5783 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ecf6fbf33f1..29e10383afe 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -108,14 +108,16 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf private def addNewBroker(broker: Broker) { val messageQueue = new LinkedBlockingQueue[QueueItem] debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}") - val brokerNode = broker.node(config.interBrokerListenerName) + val controlPlaneListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) + val controlPlaneSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + val brokerNode = broker.node(controlPlaneListenerName) val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ") val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( - config.interBrokerSecurityProtocol, + controlPlaneSecurityProtocol, JaasContext.Type.SERVER, config, - config.interBrokerListenerName, + controlPlaneListenerName, config.saslMechanismInterBrokerProtocol, config.saslInterBrokerHandshakeRequestEnable ) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 00b09688c5b..0bc8a2a6ff9 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -40,6 +40,7 @@ object RequestChannel extends Logging { private val requestLogger = Logger("kafka.request.logger") val RequestQueueSizeMetric = "RequestQueueSize" + val ControlPlaneRequestQueueSizeMetric = "ControlPlaneRequestQueueSize" val ResponseQueueSizeMetric = "ResponseQueueSize" val ProcessorMetricTag = "processor" @@ -272,13 +273,13 @@ object RequestChannel extends Logging { } } -class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { +class RequestChannel(val queueSize: Int, val requestQueueSizeMetric: String) extends KafkaMetricsGroup { import RequestChannel._ val metrics = new RequestChannel.Metrics private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val processors = new ConcurrentHashMap[Int, Processor]() - newGauge(RequestQueueSizeMetric, new Gauge[Int] { + newGauge(requestQueueSizeMetric, new Gauge[Int] { def value = requestQueue.size }) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 1365f90f763..c3dac79f5f9 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -65,8 +65,14 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics") memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE - val requestChannel = new RequestChannel(maxQueuedRequests) - private val processors = new ConcurrentHashMap[Int, Processor]() + val dataRequestChannel = new RequestChannel(maxQueuedRequests, RequestChannel.RequestQueueSizeMetric) + var controlPlaneRequestChannel: RequestChannel = null + if (config.controlPlaneListenerName.isDefined) { + controlPlaneRequestChannel = new RequestChannel(20, RequestChannel.ControlPlaneRequestQueueSizeMetric) + } + private val dataProcessors = new ConcurrentHashMap[Int, Processor]() + // there should be only one controller processor, however we use a map to store it so that we can reuse the logic for data processors + private[network] val controlPlaneProcessors = new ConcurrentHashMap[Int, Processor]() private var nextProcessorId = 0 private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]() @@ -88,25 +94,35 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def startup(startupProcessors: Boolean = true) { this.synchronized { connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides) - createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) + createAcceptorAndProcessors(config.numNetworkThreads, config.dataListeners, dataRequestChannel, dataProcessors, false) + if (config.controlPlaneListener.isDefined) + createAcceptorAndProcessors(1, Seq(config.controlPlaneListener.get), controlPlaneRequestChannel, controlPlaneProcessors, true) + if (startupProcessors) { startProcessors() } } - newGauge("NetworkProcessorAvgIdlePercent", - new Gauge[Double] { + def createNetworkProcessorAvgIdlePercentMetric(metric: String, processors: java.util.Map[Int, Processor]): Unit = { + newGauge(metric, + new Gauge[Double] { - def value = SocketServer.this.synchronized { - val ioWaitRatioMetricNames = processors.values.asScala.map { p => - metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags) + def value = SocketServer.this.synchronized { + val ioWaitRatioMetricNames = processors.values.asScala.map { p => + metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags) + } + ioWaitRatioMetricNames.map { metricName => + Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) + }.sum / processors.size } - ioWaitRatioMetricNames.map { metricName => - Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) - }.sum / processors.size } - } - ) + ) + } + + createNetworkProcessorAvgIdlePercentMetric("NetworkProcessorAvgIdlePercent", dataProcessors) + if (config.controlPlaneListener.isDefined) + createNetworkProcessorAvgIdlePercentMetric("ControlPlaneNetworkProcessorIdlePercent", controlPlaneProcessors) + newGauge("MemoryPoolAvailable", new Gauge[Long] { def value = memoryPool.availableMemory() @@ -133,7 +149,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap private def createAcceptorAndProcessors(processorsPerListener: Int, - endpoints: Seq[EndPoint]): Unit = synchronized { + endpoints: Seq[EndPoint], + requestChannel: RequestChannel, + processorCollector: java.util.Map[Int, Processor], + isControlPlane: Boolean): Unit = synchronized { val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes @@ -144,25 +163,27 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time val securityProtocol = endpoint.securityProtocol val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas) - addProcessors(acceptor, endpoint, processorsPerListener) - KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() + addProcessors(acceptor, endpoint, processorsPerListener, requestChannel, processorCollector, isControlPlane) + KafkaThread.nonDaemon((if (isControlPlane) "control" else "data") + s"-plane-kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() acceptor.awaitStartup() acceptors.put(endpoint, acceptor) } } - private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized { + private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, + newProcessorsPerListener: Int, requestChannel: RequestChannel, processorCollector: java.util.Map[Int, Processor], + isControlPlane: Boolean): Unit = synchronized { val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() for (_ <- 0 until newProcessorsPerListener) { - val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool) + val processor = newProcessor(nextProcessorId, requestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool, isControlPlane) listenerProcessors += processor requestChannel.addProcessor(processor) nextProcessorId += 1 } - listenerProcessors.foreach(p => processors.put(p.id, p)) + listenerProcessors.foreach(p => processorCollector.put(p.id, p)) acceptor.addProcessors(listenerProcessors) } @@ -173,8 +194,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time info("Stopping socket server request processors") this.synchronized { acceptors.asScala.values.foreach(_.shutdown()) - processors.asScala.values.foreach(_.shutdown()) - requestChannel.clear() + dataProcessors.asScala.values.foreach(_.shutdown()) + controlPlaneProcessors.asScala.values.foreach(_.shutdown()) + dataRequestChannel.clear() + if (controlPlaneRequestChannel != null) + controlPlaneRequestChannel.clear() stoppedProcessingRequests = true } info("Stopped socket server request processors") @@ -182,12 +206,18 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized { info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads") + val dataAcceptors = config.controlPlaneListenerName match { + case Some(controlPlaneListenerName) => + acceptors.asScala.filter{ case (endpoint, _) => !endpoint.listenerName.value().equals(controlPlaneListenerName.value())} + case None => acceptors.asScala + } + if (newNumNetworkThreads > oldNumNetworkThreads) { - acceptors.asScala.foreach { case (endpoint, acceptor) => - addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads) + dataAcceptors.foreach { case (endpoint, acceptor) => + addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads, dataRequestChannel, dataProcessors, false) } } else if (newNumNetworkThreads < oldNumNetworkThreads) - acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel)) + dataAcceptors.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, dataRequestChannel)) } /** @@ -199,7 +229,9 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time this.synchronized { if (!stoppedProcessingRequests) stopProcessingRequests() - requestChannel.shutdown() + dataRequestChannel.shutdown() + if (controlPlaneRequestChannel != null ) + controlPlaneRequestChannel.shutdown() } info("Shutdown completed") } @@ -215,7 +247,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized { info(s"Adding listeners for endpoints $listenersAdded") - createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded) + val (controlPlaneListenersAdded, dataPlaneListenersAdded) = listenersAdded.partition { endpoint => + config.controlPlaneListenerName.isDefined && endpoint.listenerName.value() == config.controlPlaneListenerName.get } + if (dataPlaneListenersAdded.nonEmpty) + createAcceptorAndProcessors(config.numNetworkThreads, dataPlaneListenersAdded, dataRequestChannel, dataProcessors, false) + if (controlPlaneListenersAdded.nonEmpty) + createAcceptorAndProcessors(1, controlPlaneListenersAdded, controlPlaneRequestChannel, controlPlaneProcessors, true) startProcessors() } @@ -237,9 +274,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } /* `protected` for test usage */ - protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, - securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { + protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + securityProtocol: SecurityProtocol, memoryPool: MemoryPool, isControlPlane: Boolean): Processor = { new Processor(id, + isControlPlane, time, config.socketRequestMaxBytes, requestChannel, @@ -261,7 +299,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time Option(connectionQuotas).fold(0)(_.get(address)) /* For test usage */ - private[network] def processor(index: Int): Processor = processors.get(index) + private[network] def processor(index: Int): Processor = dataProcessors.get(index) } @@ -355,7 +393,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, private def startProcessors(processors: Seq[Processor]): Unit = synchronized { processors.foreach { processor => - KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", + KafkaThread.nonDaemon((if (processor.isControlPlane) "control" else "data") + s"-plane-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor).start() } } @@ -498,6 +536,7 @@ private[kafka] object Processor { * each of which has its own selector */ private[kafka] class Processor(val id: Int, + val isControlPlane: Boolean, time: Time, maxRequestSize: Int, requestChannel: RequestChannel, @@ -528,6 +567,8 @@ private[kafka] class Processor(val id: Int, override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort-$index" } + // the receivesProcessed counter is defined only for testing + private[network] var receivesProcessed: Long = 0L private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() private val inflightResponses = mutable.Map[String, RequestChannel.Response]() private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]() @@ -707,6 +748,7 @@ private[kafka] class Processor(val id: Int, val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics) requestChannel.sendRequest(req) + receivesProcessed += 1 selector.mute(connectionId) handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED) case None => diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 2c0f6c1b52c..94325c6c7b0 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -640,7 +640,7 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable { override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (newConfig.numIoThreads != oldConfig.numIoThreads) - server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads) + server.dataRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9edda4ea7f1..8ce42369472 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -282,6 +282,7 @@ object KafkaConfig { val AdvertisedPortProp = "advertised.port" val AdvertisedListenersProp = "advertised.listeners" val ListenerSecurityProtocolMapProp = "listener.security.protocol.map" + val ControlPlaneListenerNameProp = "control.plane.listener.name" val SocketSendBufferBytesProp = "socket.send.buffer.bytes" val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" val SocketRequestMaxBytesProp = "socket.request.max.bytes" @@ -540,6 +541,27 @@ object KafkaConfig { "prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " + "INTERNAL listener, a config with name <code>listener.name.internal.ssl.keystore.location</code> would be set. " + "If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). " + val ControlPlaneListenerNameDoc = s"On the broker side, the $ControlPlaneListenerNameProp is used to locate an endpoint in the $ListenersProp list, which we will use to listen for connections " + + s"from the controller. For example, if a broker's config is:" + + """ +listeners=CONTROLLER://192.1.1.8:9091,INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093 +listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL +control.plane.listener.name=CONTROLLER"""+ + "\nUpon startup, a broker will bind to the local endpoint 192.1.1.8:9091 with security protocol PLAINTEXT to listen for controller connections.\n" + + s"On the controller side, when the controller discovers a broker's published endpoints through Zookeeper, " + + s"the $ControlPlaneListenerNameProp is used to locate an endpoint within the published endpoints, which it will use to establish connections with "+ + "the broker. For example, if a broker's published endpoints list on Zookeeper is" + + """ +"endpoints": [ + "CONTROLLER://broker1.example.com:9091", + "INTERNAL://broker1.example.com:9092", + "EXTERNAL://host1.example.com:9093" +]""" + + s"\n and the controller's config is\n" + + "listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL\n" + + "control.plane.listener.name=CONTROLLER\n" + + "then the controller will use the endpoint broker1.example.com:9091 with security protocol PLAINTEXT to connect to the discovered broker.\n" + + "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections." val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used." val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used." @@ -837,6 +859,7 @@ object KafkaConfig { .define(AdvertisedHostNameProp, STRING, null, HIGH, AdvertisedHostNameDoc) .define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc) .define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc) + .define(ControlPlaneListenerNameProp, STRING, null, HIGH, ControlPlaneListenerNameDoc) .define(ListenerSecurityProtocolMapProp, STRING, Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc) .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc) .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc) @@ -1253,6 +1276,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1 def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2 + val controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map(_._1) + val controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map(_._2) def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp) val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1 @@ -1325,6 +1350,21 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap)) } + // Return the endpoint corresponding to the controller listener name + def controlPlaneListener: Option[EndPoint] = { + controlPlaneListenerName.flatMap { name => + listeners.filter(endpoint => endpoint.listenerName.value() == name.value()).headOption + } + } + + // Return the endpoints that are not related to the controller listener name + def dataListeners: Seq[EndPoint] = { + Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match { + case Some(controllerListenerName) => listeners.filterNot(_.listenerName.value() == controllerListenerName) + case None => listeners + } + } + // If the user defined advertised listeners, we use those // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults // If none of these are defined, we'll use the listeners @@ -1356,6 +1396,18 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO } } + // If the controller.listener.name config is set, we use its value + // Otherwise, the controller listener name will be the same as the inter broker listener name + private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = { + Option(getString(KafkaConfig.ControlPlaneListenerNameProp)).map { name => + val controlPlaneListenerName = ListenerName.normalised(name) + val controlPlaneSecurityProtocol = listenerSecurityProtocolMap.getOrElse(controlPlaneListenerName, + throw new ConfigException(s"Listener with name ${controlPlaneListenerName.value} defined in " + + s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}.")) + (controlPlaneListenerName, controlPlaneSecurityProtocol) + } + } + private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = { try SecurityProtocol.forName(protocolName) catch { @@ -1412,6 +1464,24 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " + s"is set to version ${ApiVersion.minSupportedFor(recordVersion).shortVersion} or higher") + // validate the controller.listener.name config + if (controlPlaneListenerName.isDefined) { + require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()), + s"${KafkaConfig.ControlPlaneListenerNameProp}, when defined, should have a different value from the inter broker listener name. " + + s"Currently they both have the value ${controlPlaneListenerName.get}") + + require(listenerNames.contains(controlPlaneListenerName.get), + s"${KafkaConfig.ControlPlaneListenerNameProp} has a value of ${controlPlaneListenerName.get}, " + + s"which must be a listener name defined in ${KafkaConfig.ListenersProp}. " + + s"The current listener names defined in ${KafkaConfig.ListenersProp} are ${listenerNames.map(_.value).mkString(",")}" + ) + require(advertisedListenerNames.contains(controlPlaneListenerName.get), + s"${KafkaConfig.ControlPlaneListenerNameProp} has a value of ${controlPlaneListenerName.get}, " + + s"which must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " + + s"The current listener names defined in ${KafkaConfig.AdvertisedListenersProp} are ${advertisedListenerNames.map(_.value).mkString(",")}" + ) + } + val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM, s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index d0d41219663..68dff3ff516 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -40,6 +40,8 @@ class KafkaRequestHandler(id: Int, apis: KafkaApis, time: Time) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " + // the requestsHandled counter is defined only for testing + var requestsHandled: Long = 0L private val shutdownComplete = new CountDownLatch(1) @volatile private var stopped = false @@ -67,6 +69,7 @@ class KafkaRequestHandler(id: Int, request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") apis.handle(request) + requestsHandled += 1 } catch { case e: FatalExitError => shutdownComplete.countDown() @@ -96,13 +99,15 @@ class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, time: Time, - numThreads: Int) extends Logging with KafkaMetricsGroup { + numThreads: Int, + requestHandlerAvgIdleMetric: String, + isControlPlane: Boolean) extends Logging with KafkaMetricsGroup { private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) /* a meter to track the average free capacity of the request handlers */ - private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) + private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetric, "percent", TimeUnit.NANOSECONDS) - this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " + this.logIdent = "[" + (if (isControlPlane) "Control" else "Data") + " Plane Kafka Request Handler on Broker " + brokerId + "], " val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i <- 0 until numThreads) { createHandler(i) @@ -110,7 +115,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, def createHandler(id: Int): Unit = synchronized { runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) - KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start() + KafkaThread.daemon((if (isControlPlane) "control" else "data") + "-plane-kafka-request-handler-" + id, runnables(id)).start() } def resizeThreadPool(newSize: Int): Unit = synchronized { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0d13d9e0730..364f1dffa20 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -113,10 +113,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val brokerState: BrokerState = new BrokerState - var apis: KafkaApis = null + var dataPlaneApis: KafkaApis = null + var controlPlaneApis: KafkaApis = null var authorizer: Option[Authorizer] = None var socketServer: SocketServer = null - var requestHandlerPool: KafkaRequestHandlerPool = null + var dataRequestHandlerPool: KafkaRequestHandlerPool = null + var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null var logDirFailureChannel: LogDirFailureChannel = null var logManager: LogManager = null @@ -291,12 +293,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, + dataPlaneApis = new KafkaApis(socketServer.dataRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager) - requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, - config.numIoThreads) + dataRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataRequestChannel, dataPlaneApis, time, + config.numIoThreads, "RequestHandlerAvgIdlePercent", false) + + if (config.controlPlaneListenerName.isDefined) { + controlPlaneApis = new KafkaApis(socketServer.controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, + kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, + fetchManager, brokerTopicStats, clusterId, time, tokenManager) + + controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannel, + controlPlaneApis, time, 1, "ControlPlaneRequestHandlerIdlePercent", true) + } Mx4jLoader.maybeLoad() @@ -573,14 +584,18 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP // Socket server will be shutdown towards the end of the sequence. if (socketServer != null) CoreUtils.swallow(socketServer.stopProcessingRequests(), this) - if (requestHandlerPool != null) - CoreUtils.swallow(requestHandlerPool.shutdown(), this) + if (dataRequestHandlerPool != null) + CoreUtils.swallow(dataRequestHandlerPool.shutdown(), this) + if (controlPlaneRequestHandlerPool != null) + CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this) if (kafkaScheduler != null) CoreUtils.swallow(kafkaScheduler.shutdown(), this) - if (apis != null) - CoreUtils.swallow(apis.close(), this) + if (dataPlaneApis != null) + CoreUtils.swallow(dataPlaneApis.close(), this) + if (controlPlaneApis != null) + CoreUtils.swallow(controlPlaneApis.close(), this) CoreUtils.swallow(authorizer.foreach(_.close()), this) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown(), this) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index ca23e1fd4c4..4bdfa70dafa 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -244,9 +244,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { client = AdminClient.create(createConfig()) val nodes = client.describeCluster.nodes.get() val clusterId = client.describeCluster().clusterId().get() - assertEquals(servers.head.apis.clusterId, clusterId) + assertEquals(servers.head.dataPlaneApis.clusterId, clusterId) val controller = client.describeCluster().controller().get() - assertEquals(servers.head.apis.metadataCache.getControllerId. + assertEquals(servers.head.dataPlaneApis.metadataCache.getControllerId. getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id()) val brokers = brokerList.split(",") assertEquals(brokers.size, nodes.size) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index ae6273623ae..9aada113b1d 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -611,7 +611,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val producer = createProducer() sendRecords(producer, 1, tp) removeAllAcls() - + val consumer = createConsumer() consumer.assign(List(tp).asJava) consumeRecords(consumer) @@ -1452,9 +1452,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } def removeAllAcls() = { - servers.head.apis.authorizer.get.getAcls().keys.foreach { resource => - servers.head.apis.authorizer.get.removeAcls(resource) - TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.apis.authorizer.get, resource) + servers.head.dataPlaneApis.authorizer.get.getAcls().keys.foreach { resource => + servers.head.dataPlaneApis.authorizer.get.removeAcls(resource) + TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.dataPlaneApis.authorizer.get, resource) } } @@ -1507,8 +1507,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def addAndVerifyAcls(acls: Set[Acl], resource: Resource) = { - servers.head.apis.authorizer.get.addAcls(acls, resource) - TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource) + servers.head.dataPlaneApis.authorizer.get.addAcls(acls, resource) + TestUtils.waitAndVerifyAcls(servers.head.dataPlaneApis.authorizer.get.getAcls(resource) ++ acls, servers.head.dataPlaneApis.authorizer.get, resource) } private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 7ac3c66ebf3..a1bc27b3a68 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -285,7 +285,7 @@ abstract class QuotaTestClients(topic: String, def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode) { TestUtils.retry(10000) { - val quotaManagers = server.apis.quotas + val quotaManagers = server.dataPlaneApis.quotas val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId) val overrideConsumerQuota = quota(quotaManagers.fetch, userPrincipal, consumerClientId) val overrideProducerRequestQuota = quota(quotaManagers.request, userPrincipal, producerClientId) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index c9da8347bb3..c1db456d9a2 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -179,8 +179,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override def setUp() { super.setUp() servers.foreach { s => - TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource) - TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, Resource(Topic, "*", LITERAL)) + TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.dataPlaneApis.authorizer.get, Resource.ClusterResource) + TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.dataPlaneApis.authorizer.get, Resource(Topic, "*", LITERAL)) } // create the test topic with all the brokers as replicas createTopic(topic, 1, 3) @@ -247,16 +247,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas private def setWildcardResourceAcls() { AclCommand.main(produceConsumeWildcardAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.dataPlaneApis.authorizer.get, wildcardTopicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneApis.authorizer.get, wildcardGroupResource) } } private def setPrefixedResourceAcls() { AclCommand.main(produceConsumePrefixedAclsArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, prefixedTopicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, prefixedGroupResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneApis.authorizer.get, prefixedTopicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneApis.authorizer.get, prefixedGroupResource) } } @@ -264,9 +264,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(consumeAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneApis.authorizer.get, new Resource(Topic, tp.topic, PatternType.LITERAL)) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneApis.authorizer.get, groupResource) } val producer = createProducer() sendRecords(producer, numRecords, tp) @@ -286,7 +286,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def testNoProduceWithDescribeAcl(): Unit = { AclCommand.main(describeAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneApis.authorizer.get, topicResource) } try{ val producer = createProducer() @@ -297,7 +297,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas assertEquals(Set(topic).asJava, e.unauthorizedTopics()) } } - + /** * Tests that a consumer fails to consume messages without the appropriate * ACL set. @@ -310,7 +310,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas // the exception is expected when the consumer attempts to lookup offsets consumeRecords(consumer) } - + @Test(expected = classOf[TopicAuthorizationException]) def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = { noConsumeWithoutDescribeAclSetup() @@ -319,13 +319,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas // this should timeout since the consumer will not be able to fetch any metadata for the topic consumeRecords(consumer, timeout = 3000) } - + private def noConsumeWithoutDescribeAclSetup(): Unit = { AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneApis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneApis.authorizer.get, groupResource) } val producer = createProducer() @@ -334,10 +334,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas AclCommand.main(deleteDescribeAclArgs) AclCommand.main(deleteWriteAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneApis.authorizer.get, groupResource) } } - + @Test def testNoConsumeWithDescribeAclViaAssign(): Unit = { noConsumeWithDescribeAclSetup() @@ -352,7 +352,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas assertEquals(Set(topic).asJava, e.unauthorizedTopics()) } } - + @Test def testNoConsumeWithDescribeAclViaSubscribe(): Unit = { noConsumeWithDescribeAclSetup() @@ -367,13 +367,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas assertEquals(Set(topic).asJava, e.unauthorizedTopics()) } } - + private def noConsumeWithDescribeAclSetup(): Unit = { AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneApis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneApis.authorizer.get, groupResource) } val producer = createProducer() sendRecords(producer, numRecords, tp) @@ -387,7 +387,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def testNoGroupAcl(): Unit = { AclCommand.main(produceAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneApis.authorizer.get, topicResource) } val producer = createProducer() sendRecords(producer, numRecords, tp) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index 55e1529c963..4cbbb4798c8 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -71,7 +71,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with private def addClusterAcl(permissionType: PermissionType, operation: Operation): Unit = { val acls = Set(clusterAcl(permissionType, operation)) - val authorizer = servers.head.apis.authorizer.get + val authorizer = servers.head.dataPlaneApis.authorizer.get val prevAcls = authorizer.getAcls(AuthResource.ClusterResource) authorizer.addAcls(acls, AuthResource.ClusterResource) TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource) @@ -79,7 +79,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with private def removeClusterAcl(permissionType: PermissionType, operation: Operation): Unit = { val acls = Set(clusterAcl(permissionType, operation)) - val authorizer = servers.head.apis.authorizer.get + val authorizer = servers.head.dataPlaneApis.authorizer.get val prevAcls = authorizer.getAcls(AuthResource.ClusterResource) Assert.assertTrue(authorizer.removeAcls(acls, AuthResource.ClusterResource)) TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, AuthResource.ClusterResource) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 787cc8d3a01..e786039c826 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -500,8 +500,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @Test def testThreadPoolResize(): Unit = { - val requestHandlerPrefix = "kafka-request-handler-" - val networkThreadPrefix = "kafka-network-thread-" + val requestHandlerPrefix = "data-plane-kafka-request-handler-" + val networkThreadPrefix = "data-plane-kafka-network-thread-" val fetcherThreadPrefix = "ReplicaFetcherThread-" // Executor threads and recovery threads are not verified since threads may not be running // For others, thread count should be configuredCount * threadMultiplier * numBrokers @@ -576,7 +576,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet "", mayReceiveDuplicates = false) verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads, networkThreadPrefix, mayReceiveDuplicates = true) - verifyThreads("kafka-socket-acceptor-", config.listeners.size) + verifyThreads("data-plane-kafka-socket-acceptor-", config.listeners.size) verifyProcessorMetrics() verifyMarkPartitionsForTruncation() diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index a1c317e4e2c..73df7883516 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -175,8 +175,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { // Test that the existing clientId overrides are read val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) servers = Seq(server) - assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId)) - assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(1000, true), server.dataPlaneApis.quotas.produce.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(2000, true), server.dataPlaneApis.quotas.fetch.quota("ANONYMOUS", clientId)) } @Test diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index dc4076a27c7..152228e0bb7 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -323,10 +323,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { var activeServers = servers.filter(s => s.config.brokerId != 2) // wait for the update metadata request to trickle to the brokers TestUtils.waitUntilTrue(() => - activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3), + activeServers.forall(_.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3), "Topic test not created after timeout") assertEquals(0, partitionsRemaining.size) - var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get + var partitionStateInfo = activeServers.head.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader assertEquals(0, leaderAfterShutdown) assertEquals(2, partitionStateInfo.basePartitionState.isr.size) @@ -336,16 +336,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { partitionsRemaining = resultQueue.take().get assertEquals(0, partitionsRemaining.size) activeServers = servers.filter(s => s.config.brokerId == 0) - partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get + partitionStateInfo = activeServers.head.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get leaderAfterShutdown = partitionStateInfo.basePartitionState.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) + assertTrue(servers.forall(_.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) controller.controlledShutdown(0, controlledShutdownCallback) partitionsRemaining = resultQueue.take().get assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) + assertTrue(servers.forall(_.dataPlaneApis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) } @Test diff --git a/core/src/test/scala/unit/kafka/network/ControlPlaneTest.scala b/core/src/test/scala/unit/kafka/network/ControlPlaneTest.scala new file mode 100644 index 00000000000..9f54701260d --- /dev/null +++ b/core/src/test/scala/unit/kafka/network/ControlPlaneTest.scala @@ -0,0 +1,52 @@ +package kafka.network + +import java.util.Properties + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConverters._ + +class ControlPlaneTest extends KafkaServerTestHarness { + val numNodes = 2 + val overridingProps = new Properties() + val testedMetrics = List("ControlPlaneRequestQueueSize", "ControlPlaneNetworkProcessorIdlePercent", "ControlPlaneRequestHandlerIdlePercent") + + overridingProps.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER") + overridingProps.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + TestUtils.RandomPort + ",CONTROLLER://localhost:" + TestUtils.RandomPort) + overridingProps.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") + + /** + * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every + * test and should not reuse previous configurations unless they select their ports randomly when servers are started. + */ + override def generateConfigs: Seq[KafkaConfig] = + TestUtils.createBrokerConfigs(numNodes, zkConnect, enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps)) + + @Test + def testCreatingTopicThroughControlPlane() { + val topic = "test" + createTopic(topic, 3, numNodes) + + for (s <- servers) { + val controlPlaneProcessors = s.socketServer.controlPlaneProcessors.asScala + assertTrue(s"There should be exactly one control plane network processor thread when ${KafkaConfig.ControlPlaneListenerNameProp} is set", + controlPlaneProcessors.size == 1) + controlPlaneProcessors.foreach { + case (id, processor) => + assertTrue("The control plane network processor should have processed some request", processor.receivesProcessed > 0) + } + + assertTrue(s"There should be exactly one control plane request handler thread when ${KafkaConfig.ControlPlaneListenerNameProp} is set", + s.controlPlaneRequestHandlerPool != null && s.controlPlaneRequestHandlerPool.runnables.size == 1) + s.controlPlaneRequestHandlerPool.runnables.foreach { requestHandler => + assertTrue("The control plane request handler thread should have processed some request", requestHandler.requestsHandled > 0) + } + } + + testedMetrics.foreach { metric => TestUtils.verifyMetricExistence(metric, true)} + } +} diff --git a/core/src/test/scala/unit/kafka/network/InactiveControlPlaneTest.scala b/core/src/test/scala/unit/kafka/network/InactiveControlPlaneTest.scala new file mode 100644 index 00000000000..da0f0cebc58 --- /dev/null +++ b/core/src/test/scala/unit/kafka/network/InactiveControlPlaneTest.scala @@ -0,0 +1,46 @@ +package kafka.network + +import com.yammer.metrics.{Metrics => YammerMetrics} +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.junit.Assert._ +import org.junit.{Before, Test} + +import scala.collection.JavaConverters._ + +class InactiveControlPlaneTest extends KafkaServerTestHarness { + val numNodes = 2 + val testedMetrics = List("ControlPlaneRequestQueueSize", "ControlPlaneNetworkProcessorIdlePercent", "ControlPlaneRequestHandlerIdlePercent") + + /** + * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every + * test and should not reuse previous configurations unless they select their ports randomly when servers are started. + */ + override def generateConfigs: Seq[KafkaConfig] = + TestUtils.createBrokerConfigs(numNodes, zkConnect, enableDeleteTopic=true).map(KafkaConfig.fromProps(_)) + + @Before + override def setUp(): Unit = { + TestUtils.cleanMetricsRegistry() + super.setUp() + } + + /** + * the control plane metrics should not exist when the control.plane.listener.name is not set + */ + @Test + def testInactiveControlPlane(): Unit = { + for (s <- servers) { + val controlPlaneProcessors = s.socketServer.controlPlaneProcessors.asScala + assertTrue(s"There should be no control plane network processor thread when ${KafkaConfig.ControlPlaneListenerNameProp} is not set", + controlPlaneProcessors.isEmpty) + + assertTrue(s"There should be no control plane request handler thread when ${KafkaConfig.ControlPlaneListenerNameProp} is not set", + s.controlPlaneRequestHandlerPool == null) + } + + testedMetrics.foreach { metric => TestUtils.verifyMetricExistence(metric, false)} + } + +} diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index b5983377b77..b1458f2ddb2 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -144,13 +144,13 @@ class SocketServerTest extends JUnitSuite { def connectAndProcessRequest(s: SocketServer): (Socket, String) = { val socket = connect(s) val request = sendAndReceiveRequest(socket, s) - processRequest(s.requestChannel, request) + processRequest(s.dataRequestChannel, request) (socket, request.context.connectionId) } def sendAndReceiveRequest(socket: Socket, server: SocketServer): RequestChannel.Request = { sendRequest(socket, producerRequestBytes()) - receiveRequest(server.requestChannel) + receiveRequest(server.dataRequestChannel) } def shutdownServerAndMetrics(server: SocketServer): Unit = { @@ -181,7 +181,7 @@ class SocketServerTest extends JUnitSuite { // Test PLAINTEXT socket sendRequest(plainSocket, serializedBytes) - processRequest(server.requestChannel) + processRequest(server.dataRequestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq) } @@ -212,9 +212,9 @@ class SocketServerTest extends JUnitSuite { sendRequest(plainSocket, serializedBytes) plainSocket.close() for (_ <- 0 until 10) { - val request = receiveRequest(server.requestChannel) + val request = receiveRequest(server.dataRequestChannel) assertNotNull("receiveRequest timed out", request) - server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request)) + server.dataRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request)) } } @@ -226,9 +226,9 @@ class SocketServerTest extends JUnitSuite { for (_ <- 0 until 3) sendRequest(plainSocket, serializedBytes) for (_ <- 0 until 3) { - val request = receiveRequest(server.requestChannel) + val request = receiveRequest(server.dataRequestChannel) assertNotNull("receiveRequest timed out", request) - server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request)) + server.dataRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request)) } } @@ -239,7 +239,7 @@ class SocketServerTest extends JUnitSuite { val requests = sockets.map{socket => sendRequest(socket, serializedBytes) - receiveRequest(server.requestChannel) + receiveRequest(server.dataRequestChannel) } requests.zipWithIndex.foreach { case (request, i) => val index = request.context.connectionId.split("-").last @@ -269,14 +269,14 @@ class SocketServerTest extends JUnitSuite { // Connection with no staged receives val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT) sendRequest(socket1, serializedBytes) - val request1 = receiveRequest(overrideServer.requestChannel) + val request1 = receiveRequest(overrideServer.dataRequestChannel) assertTrue("Channel not open", openChannel(request1).nonEmpty) assertEquals(openChannel(request1), openOrClosingChannel(request1)) time.sleep(idleTimeMs + 1) TestUtils.waitUntilTrue(() => openOrClosingChannel(request1).isEmpty, "Failed to close idle channel") assertTrue("Channel not removed", openChannel(request1).isEmpty) - processRequest(overrideServer.requestChannel, request1) + processRequest(overrideServer.dataRequestChannel, request1) // Connection with staged receives val socket2 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT) @@ -285,9 +285,9 @@ class SocketServerTest extends JUnitSuite { time.sleep(idleTimeMs + 1) TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to close idle channel") TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).nonEmpty, "Channel removed without processing staged receives") - processRequest(overrideServer.requestChannel, request2) // this triggers a failed send since channel has been closed + processRequest(overrideServer.dataRequestChannel, request2) // this triggers a failed send since channel has been closed TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, "Failed to remove channel with failed sends") - assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200)) + assertNull("Received request after failed send", overrideServer.dataRequestChannel.receiveRequest(200)) } finally { shutdownServerAndMetrics(overrideServer) @@ -304,9 +304,9 @@ class SocketServerTest extends JUnitSuite { @volatile var selector: TestableSelector = null val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0" val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) { - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, - protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { - new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, + override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + protocol: SecurityProtocol, memoryPool: MemoryPool, isControlPlane: Boolean): Processor = { + new Processor(id, isControlPlane, time, config.socketRequestMaxBytes, dataRequestChannel, connectionQuotas, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) { override protected[network] def connectionId(socket: Socket): String = overrideConnectionId @@ -361,7 +361,7 @@ class SocketServerTest extends JUnitSuite { assertSame(channel1, openOrClosingChannel.getOrElse(throw new RuntimeException("Channel not found"))) // Complete request with failed send so that `channel1` is removed from Selector.closingChannels - processRequest(overrideServer.requestChannel, request) + processRequest(overrideServer.dataRequestChannel, request) TestUtils.waitUntilTrue(() => connectionCount == 0 && openOrClosingChannel.isEmpty, "Failed to remove channel with failed send") // Check that new connections can be created with the same id since `channel1` is no longer in Selector @@ -380,14 +380,14 @@ class SocketServerTest extends JUnitSuite { def sendTwoRequestsReceiveOne(): RequestChannel.Request = { sendRequest(socket, requestBytes, flush = false) sendRequest(socket, requestBytes, flush = true) - receiveRequest(server.requestChannel) + receiveRequest(server.dataRequestChannel) } val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req => val connectionId = req.context.connectionId val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0 if (!hasStagedReceives) { - processRequest(server.requestChannel, req) - processRequest(server.requestChannel) + processRequest(server.dataRequestChannel, req) + processRequest(server.dataRequestChannel) } hasStagedReceives } @@ -403,11 +403,11 @@ class SocketServerTest extends JUnitSuite { // Mimic a primitive request handler that fetches the request from RequestChannel and place a response with a // throttled channel. - val request = receiveRequest(server.requestChannel) + val request = receiveRequest(server.dataRequestChannel) val byteBuffer = request.body[AbstractRequest].serialize(request.header) val send = new NetworkSend(request.context.connectionId, byteBuffer) def channelThrottlingCallback(response: RequestChannel.Response): Unit = { - server.requestChannel.sendResponse(response) + server.dataRequestChannel.sendResponse(response) } val throttledChannel = new ThrottledChannel(request, new MockTime(), 100, channelThrottlingCallback) val response = @@ -415,7 +415,7 @@ class SocketServerTest extends JUnitSuite { new RequestChannel.SendResponse(request, send, Some(request.header.toString), None) else new RequestChannel.NoOpResponse(request) - server.requestChannel.sendResponse(response) + server.dataRequestChannel.sendResponse(response) // Quota manager would call notifyThrottlingDone() on throttling completion. Simulate it if throttleingInProgress is // false. @@ -490,7 +490,7 @@ class SocketServerTest extends JUnitSuite { val bytes = new Array[Byte](40) // send a request first to make sure the connection has been picked up by the socket server sendRequest(plainSocket, bytes, Some(0)) - processRequest(server.requestChannel) + processRequest(server.dataRequestChannel) // the following sleep is necessary to reliably detect the connection close when we send data below Thread.sleep(200L) // make sure the sockets are open @@ -527,7 +527,7 @@ class SocketServerTest extends JUnitSuite { val conn2 = connect() val serializedBytes = producerRequestBytes() sendRequest(conn2, serializedBytes) - val request = server.requestChannel.receiveRequest(2000) + val request = server.dataRequestChannel.receiveRequest(2000) assertNotNull(request) } @@ -555,7 +555,7 @@ class SocketServerTest extends JUnitSuite { val conn2 = connect(server) val serializedBytes = producerRequestBytes() sendRequest(conn2, serializedBytes) - val request = server.requestChannel.receiveRequest(2000) + val request = server.dataRequestChannel.receiveRequest(2000) assertNotNull(request) // now try to connect from the external facing interface, which should fail @@ -583,7 +583,7 @@ class SocketServerTest extends JUnitSuite { // it should succeed val serializedBytes = producerRequestBytes() sendRequest(conns.last, serializedBytes) - val request = overrideServer.requestChannel.receiveRequest(2000) + val request = overrideServer.dataRequestChannel.receiveRequest(2000) assertNotNull(request) // now try one more (should fail) @@ -627,7 +627,7 @@ class SocketServerTest extends JUnitSuite { byteBuffer.get(serializedBytes) sendRequest(sslSocket, serializedBytes) - processRequest(overrideServer.requestChannel) + processRequest(overrideServer.dataRequestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq) sslSocket.close() } finally { @@ -640,7 +640,7 @@ class SocketServerTest extends JUnitSuite { val socket = connect() val bytes = new Array[Byte](40) sendRequest(socket, bytes, Some(0)) - assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.requestChannel).session.principal) + assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataRequestChannel).session.principal) } /* Test that we update request metrics if the client closes the connection while the broker response is in flight. */ @@ -650,9 +650,9 @@ class SocketServerTest extends JUnitSuite { val serverMetrics = new Metrics var conn: Socket = null val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) { - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, - protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { - new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, + override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + protocol: SecurityProtocol, memoryPool: MemoryPool, isControlPlane: Boolean): Processor = { + new Processor(id, isControlPlane, time, config.socketRequestMaxBytes, dataRequestChannel, connectionQuotas, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, MemoryPool.NONE, new LogContext()) { override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { @@ -668,7 +668,7 @@ class SocketServerTest extends JUnitSuite { val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) - val channel = overrideServer.requestChannel + val channel = overrideServer.dataRequestChannel val request = receiveRequest(channel) val requestMetrics = channel.metrics(request.header.apiKey.name) @@ -696,9 +696,9 @@ class SocketServerTest extends JUnitSuite { @volatile var selector: TestableSelector = null val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0" val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) { - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, - protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { - new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, + override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + protocol: SecurityProtocol, memoryPool: MemoryPool, isControlPlane: Boolean): Processor = { + new Processor(id, isControlPlane, time, config.socketRequestMaxBytes, dataRequestChannel, connectionQuotas, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) { override protected[network] def connectionId(socket: Socket): String = overrideConnectionId @@ -730,7 +730,7 @@ class SocketServerTest extends JUnitSuite { socket.close() // Complete request with socket exception so that the channel is removed from Selector.closingChannels - processRequest(overrideServer.requestChannel, request) + processRequest(overrideServer.dataRequestChannel, request) TestUtils.waitUntilTrue(() => openOrClosingChannel.isEmpty, "Channel not closed after failed send") assertTrue("Unexpected completed send", selector.completedSends.isEmpty) } finally { @@ -755,7 +755,7 @@ class SocketServerTest extends JUnitSuite { conn = connect(overrideServer) val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) - val channel = overrideServer.requestChannel + val channel = overrideServer.dataRequestChannel val request = receiveRequest(channel) TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty, @@ -780,10 +780,10 @@ class SocketServerTest extends JUnitSuite { server.stopProcessingRequests() val version = ApiKeys.PRODUCE.latestVersion val version2 = (version - 1).toShort - for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark() - server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark() - assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count()) - server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1)) + for (_ <- 0 to 1) server.dataRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark() + server.dataRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark() + assertEquals(2, server.dataRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count()) + server.dataRequestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1)) val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2, s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1, "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1) @@ -879,7 +879,7 @@ class SocketServerTest extends JUnitSuite { sockets.foreach(sendRequest(_, producerRequestBytes())) testableServer.testableSelector.addFailure(SelectorOperation.Send) - sockets.foreach(_ => processRequest(testableServer.requestChannel)) + sockets.foreach(_ => processRequest(testableServer.dataRequestChannel)) testableSelector.waitForOperations(SelectorOperation.Send, 2) testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true) @@ -900,7 +900,7 @@ class SocketServerTest extends JUnitSuite { val sockets = (1 to 2).map(_ => connect(testableServer)) sockets.foreach(sendRequest(_, producerRequestBytes())) - val requestChannel = testableServer.requestChannel + val requestChannel = testableServer.dataRequestChannel val requests = sockets.map(_ => receiveRequest(requestChannel)) val failedConnectionId = requests(0).context.connectionId @@ -933,8 +933,8 @@ class SocketServerTest extends JUnitSuite { testableSelector.addFailure(SelectorOperation.Send) sockets(0).close() - processRequest(testableServer.requestChannel, request) - processRequest(testableServer.requestChannel) // Also process request from other channel + processRequest(testableServer.dataRequestChannel, request) + processRequest(testableServer.dataRequestChannel) // Also process request from other channel testableSelector.waitForOperations(SelectorOperation.Send, 2) testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = true) @@ -957,7 +957,7 @@ class SocketServerTest extends JUnitSuite { withTestableServer { testableServer => val sockets = (1 to 2).map(_ => connect(testableServer)) val testableSelector = testableServer.testableSelector - val requestChannel = testableServer.requestChannel + val requestChannel = testableServer.dataRequestChannel testableSelector.cachedCompletedReceives.minPerPoll = 2 testableSelector.addFailure(SelectorOperation.Mute) @@ -989,7 +989,7 @@ class SocketServerTest extends JUnitSuite { val requests = sockets.map(sendAndReceiveRequest(_, testableServer)) testableSelector.addFailure(SelectorOperation.Unmute) - requests.foreach(processRequest(testableServer.requestChannel, _)) + requests.foreach(processRequest(testableServer.dataRequestChannel, _)) testableSelector.waitForOperations(SelectorOperation.Unmute, 2) testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true) @@ -1073,7 +1073,7 @@ class SocketServerTest extends JUnitSuite { private def assertProcessorHealthy(testableServer: TestableSocketServer, healthySockets: Seq[Socket] = Seq.empty): Unit = { val selector = testableServer.testableSelector selector.reset() - val requestChannel = testableServer.requestChannel + val requestChannel = testableServer.dataRequestChannel // Check that existing channels behave as expected healthySockets.foreach { socket => @@ -1101,9 +1101,9 @@ class SocketServerTest extends JUnitSuite { @volatile var selector: Option[TestableSelector] = None - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, - protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { - new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs, + override def newProcessor(id: Int, requestChannel: RequestChannel,connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + protocol: SecurityProtocol, memoryPool: MemoryPool, isControlPlane: Boolean): Processor = { + new Processor(id, isControlPlane, time, config.socketRequestMaxBytes, dataRequestChannel, connectionQuotas, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) { override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = { val testableSelector = new TestableSelector(config, channelBuilder, time, metrics) diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index cabe0a984e2..9182cfc6a4e 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -93,7 +93,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000") props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000") - val quotaManagers = servers.head.apis.quotas + val quotaManagers = servers.head.dataPlaneApis.quotas rootEntityType match { case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props) case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props) @@ -179,7 +179,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Remove config change znodes to force quota initialization only through loading of user/client quotas zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p => zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) } server.startup() - val quotaManagers = server.apis.quotas + val quotaManagers = server.dataPlaneApis.quotas assertEquals(Quota.upperBound(1000), quotaManagers.produce.quota("someuser", "overriddenClientId")) assertEquals(Quota.upperBound(2000), quotaManagers.fetch.quota("someuser", "overriddenClientId")) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index b75c3e7eb24..f268ff8f87b 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -230,6 +230,40 @@ class KafkaConfigTest { assertFalse(isValidKafkaConfig(props)) } + @Test + def testControlPlaneListenerName() = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9091") + props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL") + assertTrue(isValidKafkaConfig(props)) + + // verify that there must be an entry for control.plane.listener.name in the listener.security.protocol.map + val wrongPropsWithoutSecurityProtocol = new Properties() + wrongPropsWithoutSecurityProtocol.putAll(props) + wrongPropsWithoutSecurityProtocol.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT") + assertFalse(isValidKafkaConfig(wrongPropsWithoutSecurityProtocol)) + + // verify that the control.plane.listener.name must be present in the listeners list + val propsWithNoControlPlaneEndpointInListeners = new Properties() + propsWithNoControlPlaneEndpointInListeners.putAll(props) + propsWithNoControlPlaneEndpointInListeners.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092") + assertFalse(isValidKafkaConfig(propsWithNoControlPlaneEndpointInListeners)) + + // verify that the control.plane.listener.name must be present in the advertised.listeners list + val propsWithNoControlPlaneEndpointInAdvertisedListeners = new Properties() + propsWithNoControlPlaneEndpointInAdvertisedListeners.putAll(props) + propsWithNoControlPlaneEndpointInAdvertisedListeners.put(KafkaConfig.AdvertisedHostNameProp, + "PLAINTEXT://localhost:9092") + assertFalse(isValidKafkaConfig(propsWithNoControlPlaneEndpointInAdvertisedListeners)) + + // verify that the control.plane.listener.name must be different from the inter broker listener name + val propsWithIdenticalControlPlaneAndDataPlaneListenerNames = new Properties() + propsWithIdenticalControlPlaneAndDataPlaneListenerNames.putAll(props) + propsWithIdenticalControlPlaneAndDataPlaneListenerNames.put(KafkaConfig.ControlPlaneListenerNameProp, "PLAINTEXT") + assertFalse(isValidKafkaConfig(propsWithIdenticalControlPlaneAndDataPlaneListenerNames)) + } + @Test def testBadListenerProtocol() { val props = new Properties() diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 740b28e11cf..255256ad14c 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index 4e60a8f6027..1610404c246 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -74,7 +74,7 @@ class MetadataRequestTest extends BaseRequestTest { assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2) TestUtils.waitUntilTrue(() => { val metadataResponse2 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort)) - metadataResponse2.controller != null && controllerServer2.apis.brokerId == metadataResponse2.controller.id + metadataResponse2.controller != null && controllerServer2.dataPlaneApis.brokerId == metadataResponse2.controller.id }, "Controller id should match the active controller after failover", 5000) } @@ -178,7 +178,7 @@ class MetadataRequestTest extends BaseRequestTest { assertEquals(topic1, topicMetadata1.topic) assertEquals(Errors.INVALID_TOPIC_EXCEPTION, topicMetadata2.error) assertEquals(topic2, topicMetadata2.topic) - + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0) @@ -250,7 +250,7 @@ class MetadataRequestTest extends BaseRequestTest { val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head val downNode = servers.find { server => - val serverId = server.apis.brokerId + val serverId = server.dataPlaneApis.brokerId val leaderId = partitionMetadata.leader.id val replicaIds = partitionMetadata.replicas.asScala.map(_.id) serverId != leaderId && replicaIds.contains(serverId) @@ -260,7 +260,7 @@ class MetadataRequestTest extends BaseRequestTest { TestUtils.waitUntilTrue(() => { val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head - val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get + val replica = metadata.replicas.asScala.find(_.id == downNode.dataPlaneApis.brokerId).get replica.host == "" & replica.port == -1 }, "Replica was not found down", 5000) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 3604385f6ad..d0085400699 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -100,7 +100,7 @@ class RequestQuotaTest extends BaseRequestTest { adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps) TestUtils.retry(10000) { - val quotaManager = servers.head.apis.quotas.request + val quotaManager = servers.head.dataPlaneApis.quotas.request assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client")) assertEquals(s"Request quota override not set", Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId)) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 99742307b26..bc95edb1cb1 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -35,6 +35,7 @@ import kafka.security.auth.{Acl, Authorizer, Resource} import kafka.server._ import kafka.server.checkpoints.OffsetCheckpointFile import Implicits._ +import com.yammer.metrics.Metrics import kafka.controller.LeaderIsrAndControllerEpoch import kafka.zk._ import org.apache.kafka.clients.CommonClientConfigs @@ -811,7 +812,7 @@ object TestUtils extends Logging { timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = { val expectedBrokerIds = servers.map(_.config.brokerId).toSet TestUtils.waitUntilTrue(() => servers.forall(server => - expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(_.id).toSet + expectedBrokerIds == server.dataPlaneApis.metadataCache.getAliveBrokers.map(_.id).toSet ), "Timed out waiting for broker metadata to propagate to all servers", timeout) } @@ -831,7 +832,7 @@ object TestUtils extends Logging { TestUtils.waitUntilTrue(() => servers.foldLeft(true) { (result, server) => - val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition) + val partitionStateOpt = server.dataPlaneApis.metadataCache.getPartitionInfo(topic, partition) partitionStateOpt match { case None => false case Some(partitionState) => @@ -1411,4 +1412,21 @@ object TestUtils extends Logging { .foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double]) total.toLong } + + def verifyMetricExistence(metricName: String, shouldExist: Boolean): Unit = { + val metrics = Metrics.defaultRegistry + val foundMetrics = metrics.allMetrics.asScala.filter { + k => k._1.getName.equals(metricName) + } + + val metricExists = foundMetrics.nonEmpty + + assertTrue(s"The $metricName metric should " + (if (!shouldExist) "not " else "") + "exist.", + if (shouldExist) metricExists else !metricExists ) + } + + def cleanMetricsRegistry() { + val metrics = Metrics.defaultRegistry + metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric) + } } diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index 39745e5e608..061a4bd685b 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -310,8 +310,8 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware // Test that the existing clientId overrides are read val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) servers = Seq(server) - assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId)) - assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(1000, true), server.dataPlaneApis.quotas.produce.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(2000, true), server.dataPlaneApis.quotas.fetch.quota("ANONYMOUS", clientId)) } @Test diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index 0088c657de0..26fe6828778 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDow import com.yammer.metrics.Metrics import com.yammer.metrics.core.{Gauge, Meter, MetricName} +import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Time @@ -43,7 +44,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Before override def setUp() { - cleanMetricsRegistry() + TestUtils.cleanMetricsRegistry() super.setUp() zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, "testMetricGroup", "testMetricType") @@ -637,10 +638,5 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { assertEquals(States.CLOSED, zooKeeperClient.connectionState) } - private def cleanMetricsRegistry() { - val metrics = Metrics.defaultRegistry - metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric) - } - private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8) } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 8bca79f8d83..a9461fe99e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -518,7 +518,7 @@ public static void waitUntilMetadataIsPropagated(final List<KafkaServer> servers final long timeout) throws InterruptedException { TestUtils.waitForCondition(() -> { for (final KafkaServer server : servers) { - final MetadataCache metadataCache = server.apis().metadataCache(); + final MetadataCache metadataCache = server.dataPlaneApis().metadataCache(); final Option<UpdateMetadataRequest.PartitionState> partitionInfo = metadataCache.getPartitionInfo(topic, partition); if (partitionInfo.isEmpty()) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > add request prioritization > -------------------------- > > Key: KAFKA-4453 > URL: https://issues.apache.org/jira/browse/KAFKA-4453 > Project: Kafka > Issue Type: Improvement > Reporter: Onur Karaman > Assignee: Mayuresh Gharat > Priority: Major > > Today all requests (client requests, broker requests, controller requests) to > a broker are put into the same queue. They all have the same priority. So a > backlog of requests ahead of the controller request will delay the processing > of controller requests. This causes requests infront of the controller > request to get processed based on stale state. > Side effects may include giving clients stale metadata\[1\], rejecting > ProduceRequests and FetchRequests\[2\], and data loss (for some > unofficial\[3\] definition of data loss in terms of messages beyond the high > watermark)\[4\]. > We'd like to minimize the number of requests processed based on stale state. > With request prioritization, controller requests get processed before regular > queued up requests, so requests can get processed with up-to-date state. > \[1\] Say a client's MetadataRequest is sitting infront of a controller's > UpdateMetadataRequest on a given broker's request queue. Suppose the > MetadataRequest is for a topic whose partitions have recently undergone > leadership changes and that these leadership changes are being broadcasted > from the controller in the later UpdateMetadataRequest. Today the broker > processes the MetadataRequest before processing the UpdateMetadataRequest, > meaning the metadata returned to the client will be stale. The client will > waste a roundtrip sending requests to the stale partition leader, get a > NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the > topic metadata again. > \[2\] Clients can issue ProduceRequests to the wrong broker based on stale > metadata, causing rejected ProduceRequests. Based on how long the client acts > based on the stale metadata, the impact may or may not be visible to a > producer application. If the number of rejected ProduceRequests does not > exceed the max number of retries, the producer application would not be > impacted. On the other hand, if the retries are exhausted, the failed produce > will be visible to the producer application. > \[3\] The official definition of data loss in kafka is when we lose a > "committed" message. A message is considered "committed" when all in sync > replicas for that partition have applied it to their log. > \[4\] Say a number of ProduceRequests are sitting infront of a controller's > LeaderAndIsrRequest on a given broker's request queue. Suppose the > ProduceRequests are for partitions whose leadership has recently shifted out > from the current broker to another broker in the replica set. Today the > broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning > the ProduceRequests are getting processed on the former partition leader. As > part of becoming a follower for a partition, the broker truncates the log to > the high-watermark. With weaker ack settings such as acks=1, the leader may > successfully write to its own log, respond to the user with a success, > process the LeaderAndIsrRequest making the broker a follower of the > partition, and truncate the log to a point before the user's produced > messages. So users have a false sense that their produce attempt succeeded > while in reality their messages got erased. While technically part of what > they signed up for with acks=1, it can still come as a surprise. -- This message was sent by Atlassian JIRA (v7.6.3#76005)