This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 255bf5a1d37 KAFKA-14774 the removed listeners should not be 
reconfigurable (#13472)
255bf5a1d37 is described below

commit 255bf5a1d37ad404c9fe8ffd429c1331b9f60b6d
Author: Chia-Ping Tsai <chia7...@gmail.com>
AuthorDate: Wed Mar 29 22:08:00 2023 +0800

    KAFKA-14774 the removed listeners should not be reconfigurable (#13472)
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../main/scala/kafka/network/SocketServer.scala    |  1 +
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 34 ++++++++++++++++++++--
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 0c08d7b056a..f5b7e689d50 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -323,6 +323,7 @@ class SocketServer(val config: KafkaConfig,
       dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
         acceptor.beginShutdown()
         acceptor.close()
+        config.removeReconfigurable(acceptor)
       }
     }
   }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 38f7722d343..e66b075b5c4 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -208,7 +208,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 
   // Use COWArrayList to prevent concurrent modification exception when an 
item is added by one thread to these
   // collections, while another thread is iterating over them.
-  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+  private[server] val reconfigurables = new 
CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = new 
CopyOnWriteArrayList[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig: KafkaConfig = _
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 92a6e457544..4f01432e167 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -33,7 +33,7 @@ import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
 import kafka.log.{CleanerConfig, LogConfig, UnifiedLog}
 import kafka.message.ProducerCompressionCodec
-import kafka.network.{Processor, RequestChannel}
+import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
@@ -75,6 +75,7 @@ import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
 object DynamicBrokerReconfigurationTest {
+  val Plain = "PLAIN"
   val SecureInternal = "INTERNAL"
   val SecureExternal = "EXTERNAL"
 }
@@ -123,7 +124,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
       // Ensure that we can support multiple listeners per security protocol 
and multiple security protocols
       props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, 
$SecureExternal://localhost:0")
-      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, 
CONTROLLER:$controllerListenerSecurityProtocol")
+      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, 
CONTROLLER:$controllerListenerSecurityProtocol")
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
       props.put(KafkaConfig.SslClientAuthProp, "requested")
       props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
@@ -1165,6 +1166,35 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     assertTrue(partitions.exists(_.leader == null), "Did not find partitions 
with no leader")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testReconfigureRemovedListener(quorum: String): Unit = {
+    val client = adminClients.head
+    val broker = servers.head
+    assertEquals(2, 
broker.config.dynamicConfig.reconfigurables.asScala.count(r => 
r.isInstanceOf[DataPlaneAcceptor]))
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, 
broker.config.brokerId.toString)
+
+    def acceptors: Seq[DataPlaneAcceptor] = 
broker.config.dynamicConfig.reconfigurables.asScala.filter(_.isInstanceOf[DataPlaneAcceptor])
+      .map(_.asInstanceOf[DataPlaneAcceptor]).toSeq
+
+    // add new PLAINTEXT listener
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp,
+        s"PLAINTEXT://localhost:0, $SecureInternal://localhost:0, 
$SecureExternal://localhost:0"), AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+
+    TestUtils.waitUntilTrue(() => acceptors.size == 3, s"failed to add new 
DataPlaneAcceptor")
+
+    // remove PLAINTEXT listener
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp,
+        s"$SecureInternal://localhost:0, $SecureExternal://localhost:0"), 
AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+
+    TestUtils.waitUntilTrue(() => acceptors.size == 2,
+      s"failed to remove DataPlaneAcceptor. current: 
${acceptors.map(_.endPoint.toString).mkString(",")}")
+  }
+
   private def addListener(servers: Seq[KafkaBroker], listenerName: String, 
securityProtocol: SecurityProtocol,
                           saslMechanisms: Seq[String]): Unit = {
     val config = servers.head.config

Reply via email to