This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 44e419722e6f2baa4366eeb5eacc55b91fa9b0d1 Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Tue Apr 19 13:17:16 2022 -0700 KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft (#12063) Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a broker configuration is changed. This is backwards. This function must be called only for broker configs, and never for topic configs or cluster configs. The second bug is that there were several configurations such as max.connections which are related to broker listeners, but which do not involve changing the registered listeners. We should support these configurations in KRaft. This PR fixes the configuration change validation to support this case. Reviewers: Jason Gustafson <ja...@confluent.io>, Matthew de Detrich <mdedetr...@gmail.com> --- .../scala/kafka/server/DynamicBrokerConfig.scala | 64 +++++++++++++++------ .../server/metadata/BrokerMetadataListener.scala | 18 ++++++ .../server/metadata/BrokerMetadataPublisher.scala | 41 ++++++++----- .../metadata/BrokerMetadataPublisherTest.scala | 67 ++++++++++++++++++++++ 4 files changed, 159 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index a40444507b8..918e936724f 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -820,8 +820,12 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala } } -object DynamicListenerConfig { +object DynamicListenerConfig { + /** + * The set of configurations which the DynamicListenerConfig object listens for. Many of + * these are also monitored by other objects such as ChannelBuilders and SocketServers. + */ val ReconfigurableConfigs = Set( // Listener configs KafkaConfig.AdvertisedListenersProp, @@ -909,11 +913,32 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi DynamicListenerConfig.ReconfigurableConfigs } + private def listenerRegistrationsAltered( + oldAdvertisedListeners: Map[ListenerName, EndPoint], + newAdvertisedListeners: Map[ListenerName, EndPoint] + ): Boolean = { + if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true + oldAdvertisedListeners.forKeyValue { + case (oldListenerName, oldEndpoint) => + newAdvertisedListeners.get(oldListenerName) match { + case None => return true + case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) { + return true + } + } + } + false + } + + private def verifyListenerRegistrationAlterationSupported(): Unit = { + if (!server.config.requiresZookeeper) { + throw new ConfigException("Advertised listeners cannot be altered when using a " + + "Raft-based metadata quorum.") + } + } + def validateReconfiguration(newConfig: KafkaConfig): Unit = { val oldConfig = server.config - if (!oldConfig.requiresZookeeper) { - throw new ConfigException("Dynamic reconfiguration of listeners is not yet supported when using a Raft-based metadata quorum") - } val newListeners = listenersToMap(newConfig.listeners) val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners) val oldListeners = listenersToMap(oldConfig.listeners) @@ -936,6 +961,13 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi } if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName)) throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}") + + // Currently, we do not support adding or removing listeners when in KRaft mode. + // However, we support changing other listener configurations (max connections, etc.) + if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners), + listenersToMap(newConfig.effectiveAdvertisedListeners))) { + verifyListenerRegistrationAlterationSupported() + } } def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { @@ -945,18 +977,18 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi val oldListenerMap = listenersToMap(oldListeners) val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName)) val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName)) - - // Clear SASL login cache to force re-login - if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty) - LoginManager.closeAll() - - server.socketServer.removeListeners(listenersRemoved) - if (listenersAdded.nonEmpty) - server.socketServer.addListeners(listenersAdded) - - server match { - case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo) - case _ => + if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) { + LoginManager.closeAll() // Clear SASL login cache to force re-login + if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved) + if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded) + } + if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners), + listenersToMap(newConfig.effectiveAdvertisedListeners))) { + verifyListenerRegistrationAlterationSupported() + server match { + case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo) + case _ => throw new RuntimeException("Unable to handle non-kafkaServer") + } } } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 5b118220071..5b71409714d 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -248,6 +248,24 @@ class BrokerMetadataListener( } } + // This is used in tests to alter the publisher that is in use by the broker. + def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = { + val event = new AlterPublisherEvent(publisher) + eventQueue.append(event) + event.future + } + + class AlterPublisherEvent(publisher: MetadataPublisher) + extends EventQueue.FailureLoggingEvent(log) { + val future = new CompletableFuture[Void]() + + override def run(): Unit = { + _publisher = Some(publisher) + log.info(s"Set publisher to ${publisher}") + future.complete(null) + } + } + private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta _image = _delta.apply() diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 74c5348afc7..291a1507d28 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -17,6 +17,8 @@ package kafka.server.metadata +import java.util.Properties + import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.{LogManager, UnifiedLog} @@ -187,21 +189,26 @@ class BrokerMetadataPublisher(conf: KafkaConfig, toLoggableProps(resource, props).mkString(",")) dynamicConfigHandlers(ConfigType.Topic). processConfigChanges(resource.name(), props) - conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props) - case BROKER => if (resource.name().isEmpty) { - // Apply changes to "cluster configs" (also known as default BROKER configs). - // These are stored in KRaft with an empty name field. - info(s"Updating cluster configuration : " + - toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.Broker). - processConfigChanges(ConfigEntityName.Default, props) - } else if (resource.name().equals(brokerId.toString)) { - // Apply changes to this broker's dynamic configuration. - info(s"Updating broker ${brokerId} with new configuration : " + - toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.Broker). - processConfigChanges(resource.name(), props) - } + case BROKER => + if (resource.name().isEmpty) { + // Apply changes to "cluster configs" (also known as default BROKER configs). + // These are stored in KRaft with an empty name field. + info("Updating cluster configuration : " + + toLoggableProps(resource, props).mkString(",")) + dynamicConfigHandlers(ConfigType.Broker). + processConfigChanges(ConfigEntityName.Default, props) + } else if (resource.name() == brokerId.toString) { + // Apply changes to this broker's dynamic configuration. + info(s"Updating broker ${brokerId} with new configuration : " + + toLoggableProps(resource, props).mkString(",")) + dynamicConfigHandlers(ConfigType.Broker). + processConfigChanges(resource.name(), props) + // When applying a per broker config (not a cluster config), we also + // reload any associated file. For example, if the ssl.keystore is still + // set to /tmp/foo, we still want to reload /tmp/foo in case its contents + // have changed. This doesn't apply to topic configs or cluster configs. + reloadUpdatedFilesWithoutConfigChange(props) + } case _ => // nothing to do } } @@ -250,6 +257,10 @@ class BrokerMetadataPublisher(conf: KafkaConfig, } } + def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = { + conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props) + } + /** * Update the coordinator of local replica changes: election and resignation. * diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 9482ae27be3..329c9d1e1ea 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -17,15 +17,30 @@ package unit.kafka.server.metadata +import java.util.Collections.{singleton, singletonMap} +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + import kafka.log.UnifiedLog +import kafka.server.KafkaConfig import kafka.server.metadata.BrokerMetadataPublisher +import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET +import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.ConfigResource.Type.BROKER import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.PartitionRegistration import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.mockito.ArgumentMatchers.any import org.mockito.Mockito +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer + import scala.jdk.CollectionConverters._ class BrokerMetadataPublisherTest { @@ -142,4 +157,56 @@ class BrokerMetadataPublisherTest { new TopicsImage(idsMap.asJava, namesMap.asJava) } + @Test + def testReloadUpdatedFilesWithoutConfigChange(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(1).build()).build() + try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val broker = cluster.brokers().values().iterator().next() + val publisher = Mockito.spy(new BrokerMetadataPublisher( + conf = broker.config, + metadataCache = broker.metadataCache, + logManager = broker.logManager, + replicaManager = broker.replicaManager, + groupCoordinator = broker.groupCoordinator, + txnCoordinator = broker.transactionCoordinator, + clientQuotaMetadataManager = broker.clientQuotaMetadataManager, + featureCache = broker.featureCache, + dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap, + _authorizer = Option.empty + )) + val numTimesReloadCalled = new AtomicInteger(0) + Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())). + thenAnswer(new Answer[Unit]() { + override def answer(invocation: InvocationOnMock): Unit = numTimesReloadCalled.addAndGet(1) + }) + broker.metadataListener.alterPublisher(publisher).get() + val admin = Admin.create(cluster.clientProperties()) + try { + assertEquals(0, numTimesReloadCalled.get()) + admin.incrementalAlterConfigs(singletonMap( + new ConfigResource(BROKER, ""), + singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.MaxConnectionsProp, "123"), SET)))).all().get() + TestUtils.waitUntilTrue(() => numTimesReloadCalled.get() == 0, + "numTimesConfigured never reached desired value") + + // Setting the foo.bar.test.configuration to 1 will still trigger reconfiguration because + // reloadUpdatedFilesWithoutConfigChange will be called. + admin.incrementalAlterConfigs(singletonMap( + new ConfigResource(BROKER, broker.config.nodeId.toString), + singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.MaxConnectionsProp, "123"), SET)))).all().get() + TestUtils.waitUntilTrue(() => numTimesReloadCalled.get() == 1, + "numTimesConfigured never reached desired value") + } finally { + admin.close() + } + } finally { + cluster.close() + } + } }