This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 44e419722e6f2baa4366eeb5eacc55b91fa9b0d1
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Tue Apr 19 13:17:16 2022 -0700

    KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft 
(#12063)
    
    Fix two bugs related to dynamic broker configs in KRaft. The first bug is 
that we are calling reloadUpdatedFilesWithoutConfigChange when a topic 
configuration is changed, but not when a
    broker configuration is changed. This is backwards. This function must be 
called only for broker
    configs, and never for topic configs or cluster configs.
    
    The second bug is that there were several configurations such as 
max.connections which are related
    to broker listeners, but which do not involve changing the registered 
listeners. We should support
    these configurations in KRaft. This PR fixes the configuration change 
validation to support this case.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Matthew de Detrich 
<mdedetr...@gmail.com>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 64 +++++++++++++++------
 .../server/metadata/BrokerMetadataListener.scala   | 18 ++++++
 .../server/metadata/BrokerMetadataPublisher.scala  | 41 ++++++++-----
 .../metadata/BrokerMetadataPublisherTest.scala     | 67 ++++++++++++++++++++++
 4 files changed, 159 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index a40444507b8..918e936724f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -820,8 +820,12 @@ class DynamicMetricsReporters(brokerId: Int, server: 
KafkaBroker) extends Reconf
     
configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
   }
 }
-object DynamicListenerConfig {
 
+object DynamicListenerConfig {
+  /**
+   * The set of configurations which the DynamicListenerConfig object listens 
for. Many of
+   * these are also monitored by other objects such as ChannelBuilders and 
SocketServers.
+   */
   val ReconfigurableConfigs = Set(
     // Listener configs
     KafkaConfig.AdvertisedListenersProp,
@@ -909,11 +913,32 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     DynamicListenerConfig.ReconfigurableConfigs
   }
 
+  private def listenerRegistrationsAltered(
+    oldAdvertisedListeners: Map[ListenerName, EndPoint],
+    newAdvertisedListeners: Map[ListenerName, EndPoint]
+  ): Boolean = {
+    if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
+    oldAdvertisedListeners.forKeyValue {
+      case (oldListenerName, oldEndpoint) =>
+        newAdvertisedListeners.get(oldListenerName) match {
+          case None => return true
+          case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) {
+            return true
+          }
+        }
+    }
+    false
+  }
+
+  private def verifyListenerRegistrationAlterationSupported(): Unit = {
+    if (!server.config.requiresZookeeper) {
+      throw new ConfigException("Advertised listeners cannot be altered when 
using a " +
+        "Raft-based metadata quorum.")
+    }
+  }
+
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     val oldConfig = server.config
-    if (!oldConfig.requiresZookeeper) {
-      throw new ConfigException("Dynamic reconfiguration of listeners is not 
yet supported when using a Raft-based metadata quorum")
-    }
     val newListeners = listenersToMap(newConfig.listeners)
     val newAdvertisedListeners = 
listenersToMap(newConfig.effectiveAdvertisedListeners)
     val oldListeners = listenersToMap(oldConfig.listeners)
@@ -936,6 +961,13 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     }
     if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
       throw new ConfigException(s"Advertised listener must be specified for 
inter-broker listener ${newConfig.interBrokerListenerName}")
+
+    // Currently, we do not support adding or removing listeners when in KRaft 
mode.
+    // However, we support changing other listener configurations (max 
connections, etc.)
+    if 
(listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
+        listenersToMap(newConfig.effectiveAdvertisedListeners))) {
+      verifyListenerRegistrationAlterationSupported()
+    }
   }
 
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
@@ -945,18 +977,18 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     val oldListenerMap = listenersToMap(oldListeners)
     val listenersRemoved = oldListeners.filterNot(e => 
newListenerMap.contains(e.listenerName))
     val listenersAdded = newListeners.filterNot(e => 
oldListenerMap.contains(e.listenerName))
-
-    // Clear SASL login cache to force re-login
-    if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
-      LoginManager.closeAll()
-
-    server.socketServer.removeListeners(listenersRemoved)
-    if (listenersAdded.nonEmpty)
-      server.socketServer.addListeners(listenersAdded)
-
-    server match {
-      case kafkaServer: KafkaServer => 
kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
-      case _ =>
+    if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {
+      LoginManager.closeAll() // Clear SASL login cache to force re-login
+      if (listenersRemoved.nonEmpty) 
server.socketServer.removeListeners(listenersRemoved)
+      if (listenersAdded.nonEmpty) 
server.socketServer.addListeners(listenersAdded)
+    }
+    if 
(listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
+        listenersToMap(newConfig.effectiveAdvertisedListeners))) {
+      verifyListenerRegistrationAlterationSupported()
+      server match {
+        case kafkaServer: KafkaServer => 
kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
+        case _ => throw new RuntimeException("Unable to handle 
non-kafkaServer")
+      }
     }
   }
 
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 5b118220071..5b71409714d 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -248,6 +248,24 @@ class BrokerMetadataListener(
     }
   }
 
+  // This is used in tests to alter the publisher that is in use by the broker.
+  def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = {
+    val event = new AlterPublisherEvent(publisher)
+    eventQueue.append(event)
+    event.future
+  }
+
+  class AlterPublisherEvent(publisher: MetadataPublisher)
+    extends EventQueue.FailureLoggingEvent(log) {
+    val future = new CompletableFuture[Void]()
+
+    override def run(): Unit = {
+      _publisher = Some(publisher)
+      log.info(s"Set publisher to ${publisher}")
+      future.complete(null)
+    }
+  }
+
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
     _image = _delta.apply()
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 74c5348afc7..291a1507d28 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -17,6 +17,8 @@
 
 package kafka.server.metadata
 
+import java.util.Properties
+
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogManager, UnifiedLog}
@@ -187,21 +189,26 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                 toLoggableProps(resource, props).mkString(","))
               dynamicConfigHandlers(ConfigType.Topic).
                 processConfigChanges(resource.name(), props)
-              conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
-            case BROKER => if (resource.name().isEmpty) {
-              // Apply changes to "cluster configs" (also known as default 
BROKER configs).
-              // These are stored in KRaft with an empty name field.
-              info(s"Updating cluster configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Broker).
-                processConfigChanges(ConfigEntityName.Default, props)
-            } else if (resource.name().equals(brokerId.toString)) {
-              // Apply changes to this broker's dynamic configuration.
-              info(s"Updating broker ${brokerId} with new configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Broker).
-                processConfigChanges(resource.name(), props)
-            }
+            case BROKER =>
+              if (resource.name().isEmpty) {
+                // Apply changes to "cluster configs" (also known as default 
BROKER configs).
+                // These are stored in KRaft with an empty name field.
+                info("Updating cluster configuration : " +
+                  toLoggableProps(resource, props).mkString(","))
+                dynamicConfigHandlers(ConfigType.Broker).
+                  processConfigChanges(ConfigEntityName.Default, props)
+              } else if (resource.name() == brokerId.toString) {
+                // Apply changes to this broker's dynamic configuration.
+                info(s"Updating broker ${brokerId} with new configuration : " +
+                  toLoggableProps(resource, props).mkString(","))
+                dynamicConfigHandlers(ConfigType.Broker).
+                  processConfigChanges(resource.name(), props)
+                // When applying a per broker config (not a cluster config), 
we also
+                // reload any associated file. For example, if the 
ssl.keystore is still
+                // set to /tmp/foo, we still want to reload /tmp/foo in case 
its contents
+                // have changed. This doesn't apply to topic configs or 
cluster configs.
+                reloadUpdatedFilesWithoutConfigChange(props)
+              }
             case _ => // nothing to do
           }
         }
@@ -250,6 +257,10 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
     }
   }
 
+  def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
+    conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
+  }
+
   /**
    * Update the coordinator of local replica changes: election and resignation.
    *
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 9482ae27be3..329c9d1e1ea 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,15 +17,30 @@
 
 package unit.kafka.server.metadata
 
+import java.util.Collections.{singleton, singletonMap}
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
 import kafka.log.UnifiedLog
+import kafka.server.KafkaConfig
 import kafka.server.metadata.BrokerMetadataPublisher
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.BROKER
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
 import scala.jdk.CollectionConverters._
 
 class BrokerMetadataPublisherTest {
@@ -142,4 +157,56 @@ class BrokerMetadataPublisherTest {
     new TopicsImage(idsMap.asJava, namesMap.asJava)
   }
 
+  @Test
+  def testReloadUpdatedFilesWithoutConfigChange(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val broker = cluster.brokers().values().iterator().next()
+      val publisher = Mockito.spy(new BrokerMetadataPublisher(
+        conf = broker.config,
+        metadataCache = broker.metadataCache,
+        logManager = broker.logManager,
+        replicaManager = broker.replicaManager,
+        groupCoordinator = broker.groupCoordinator,
+        txnCoordinator = broker.transactionCoordinator,
+        clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
+        featureCache = broker.featureCache,
+        dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
+        _authorizer = Option.empty
+      ))
+      val numTimesReloadCalled = new AtomicInteger(0)
+      
Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).
+        thenAnswer(new Answer[Unit]() {
+          override def answer(invocation: InvocationOnMock): Unit = 
numTimesReloadCalled.addAndGet(1)
+        })
+      broker.metadataListener.alterPublisher(publisher).get()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        assertEquals(0, numTimesReloadCalled.get())
+        admin.incrementalAlterConfigs(singletonMap(
+          new ConfigResource(BROKER, ""),
+          singleton(new AlterConfigOp(new 
ConfigEntry(KafkaConfig.MaxConnectionsProp, "123"), SET)))).all().get()
+        TestUtils.waitUntilTrue(() => numTimesReloadCalled.get() == 0,
+          "numTimesConfigured never reached desired value")
+
+        // Setting the foo.bar.test.configuration to 1 will still trigger 
reconfiguration because
+        // reloadUpdatedFilesWithoutConfigChange will be called.
+        admin.incrementalAlterConfigs(singletonMap(
+          new ConfigResource(BROKER, broker.config.nodeId.toString),
+          singleton(new AlterConfigOp(new 
ConfigEntry(KafkaConfig.MaxConnectionsProp, "123"), SET)))).all().get()
+        TestUtils.waitUntilTrue(() => numTimesReloadCalled.get() == 1,
+          "numTimesConfigured never reached desired value")
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
 }

Reply via email to