This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new 255bf5a1d37 KAFKA-14774 the removed listeners should not be reconfigurable (#13472) 255bf5a1d37 is described below commit 255bf5a1d37ad404c9fe8ffd429c1331b9f60b6d Author: Chia-Ping Tsai <chia7...@gmail.com> AuthorDate: Wed Mar 29 22:08:00 2023 +0800 KAFKA-14774 the removed listeners should not be reconfigurable (#13472) Reviewers: Luke Chen <show...@gmail.com> --- .../main/scala/kafka/network/SocketServer.scala | 1 + .../scala/kafka/server/DynamicBrokerConfig.scala | 2 +- .../server/DynamicBrokerReconfigurationTest.scala | 34 ++++++++++++++++++++-- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 0c08d7b056a..f5b7e689d50 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -323,6 +323,7 @@ class SocketServer(val config: KafkaConfig, dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor => acceptor.beginShutdown() acceptor.close() + config.removeReconfigurable(acceptor) } } } diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 38f7722d343..e66b075b5c4 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -208,7 +208,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these // collections, while another thread is iterating over them. - private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]() + private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]() private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock private var currentConfig: KafkaConfig = _ diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 92a6e457544..4f01432e167 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -33,7 +33,7 @@ import kafka.api.{KafkaSasl, SaslSetup} import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager} import kafka.log.{CleanerConfig, LogConfig, UnifiedLog} import kafka.message.ProducerCompressionCodec -import kafka.network.{Processor, RequestChannel} +import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel} import kafka.utils._ import kafka.utils.Implicits._ import kafka.utils.TestUtils.TestControllerRequestCompletionHandler @@ -75,6 +75,7 @@ import scala.jdk.CollectionConverters._ import scala.collection.Seq object DynamicBrokerReconfigurationTest { + val Plain = "PLAIN" val SecureInternal = "INTERNAL" val SecureExternal = "EXTERNAL" } @@ -123,7 +124,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS) // Ensure that we can support multiple listeners per security protocol and multiple security protocols props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0") - props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol") props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal) props.put(KafkaConfig.SslClientAuthProp, "requested") props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN") @@ -1165,6 +1166,35 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertTrue(partitions.exists(_.leader == null), "Did not find partitions with no leader") } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testReconfigureRemovedListener(quorum: String): Unit = { + val client = adminClients.head + val broker = servers.head + assertEquals(2, broker.config.dynamicConfig.reconfigurables.asScala.count(r => r.isInstanceOf[DataPlaneAcceptor])) + val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, broker.config.brokerId.toString) + + def acceptors: Seq[DataPlaneAcceptor] = broker.config.dynamicConfig.reconfigurables.asScala.filter(_.isInstanceOf[DataPlaneAcceptor]) + .map(_.asInstanceOf[DataPlaneAcceptor]).toSeq + + // add new PLAINTEXT listener + client.incrementalAlterConfigs(Map(broker0Resource -> + Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp, + s"PLAINTEXT://localhost:0, $SecureInternal://localhost:0, $SecureExternal://localhost:0"), AlterConfigOp.OpType.SET) + ).asJavaCollection).asJava).all().get() + + TestUtils.waitUntilTrue(() => acceptors.size == 3, s"failed to add new DataPlaneAcceptor") + + // remove PLAINTEXT listener + client.incrementalAlterConfigs(Map(broker0Resource -> + Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp, + s"$SecureInternal://localhost:0, $SecureExternal://localhost:0"), AlterConfigOp.OpType.SET) + ).asJavaCollection).asJava).all().get() + + TestUtils.waitUntilTrue(() => acceptors.size == 2, + s"failed to remove DataPlaneAcceptor. current: ${acceptors.map(_.endPoint.toString).mkString(",")}") + } + private def addListener(servers: Seq[KafkaBroker], listenerName: String, securityProtocol: SecurityProtocol, saslMechanisms: Seq[String]): Unit = { val config = servers.head.config