This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new a687d4d3f6 KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455) a687d4d3f6 is described below commit a687d4d3f6874f7821996d644da6df9491bf9232 Author: David Arthur <mum...@gmail.com> AuthorDate: Wed Aug 3 13:28:06 2022 -0400 KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455) Enable some of the dynamic broker reconfiguration tests in KRaft mode --- .../src/main/scala/kafka/admin/ConfigCommand.scala | 2 +- .../scala/kafka/server/DynamicBrokerConfig.scala | 8 +- core/src/main/scala/kafka/server/KafkaBroker.scala | 1 + core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- .../main/scala/kafka/utils/PasswordEncoder.scala | 45 ++++++-- .../server/DynamicBrokerReconfigurationTest.scala | 128 +++++++++++++-------- .../unit/kafka/utils/PasswordEncoderTest.scala | 10 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++-- 8 files changed, 145 insertions(+), 76 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 4676bfd101..9a42f9b874 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -211,7 +211,7 @@ object ConfigCommand extends Logging { encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp) val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp, throw new IllegalArgumentException("Password encoder secret not specified")) - new PasswordEncoder(new Password(encoderSecret), + PasswordEncoder.encrypting(new Password(encoderSecret), None, encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm), encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength), diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 33511147e6..76a42b74fa 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -211,7 +211,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock private var currentConfig: KafkaConfig = null - private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret) + private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) { + maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret) + } else { + Some(PasswordEncoder.noop()) + } private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false, None) @@ -340,7 +344,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = { secret.map { secret => - new PasswordEncoder(secret, + PasswordEncoder.encrypting(secret, kafkaConfig.passwordEncoderKeyFactoryAlgorithm, kafkaConfig.passwordEncoderCipherAlgorithm, kafkaConfig.passwordEncoderKeyLength, diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index 46f2e7e8b1..b02b1167c5 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -89,6 +89,7 @@ trait KafkaBroker extends KafkaMetricsGroup { def shutdown(): Unit def brokerTopicStats: BrokerTopicStats def credentialProvider: CredentialProvider + def clientToControllerChannelManager: BrokerToControllerChannelManager // For backwards compatibility, we need to keep older metrics tied // to their original name when this class was named `KafkaServer` diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a9fbda6c21..4e253047ee 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1493,6 +1493,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // Cache the current config to avoid acquiring read lock to access from dynamicConfig @volatile private var currentConfig = this + val processRoles: Set[ProcessRole] = parseProcessRoles() private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this)) private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = { @@ -1612,7 +1613,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) val nodeId: Int = getInt(KafkaConfig.NodeIdProp) - val processRoles: Set[ProcessRole] = parseProcessRoles() val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp) val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp) val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp) diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala index f748a455c6..3373223e36 100644 --- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala +++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala @@ -38,6 +38,33 @@ object PasswordEncoder { val IterationsProp = "iterations" val EncyrptedPasswordProp = "encryptedPassword" val PasswordLengthProp = "passwordLength" + + def encrypting(secret: Password, + keyFactoryAlgorithm: Option[String], + cipherAlgorithm: String, + keyLength: Int, + iterations: Int): EncryptingPasswordEncoder = { + new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations) + } + + def noop(): NoOpPasswordEncoder = { + new NoOpPasswordEncoder() + } +} + +trait PasswordEncoder { + def encode(password: Password): String + def decode(encodedPassword: String): Password + + private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded) +} + +/** + * A password encoder that does not modify the given password. This is used in KRaft mode only. + */ +class NoOpPasswordEncoder extends PasswordEncoder { + override def encode(password: Password): String = password.value() + override def decode(encodedPassword: String): Password = new Password(encodedPassword) } /** @@ -55,16 +82,18 @@ object PasswordEncoder { * The values used for encoding are stored along with the encoded password and the stored values are used for decoding. * */ -class PasswordEncoder(secret: Password, - keyFactoryAlgorithm: Option[String], - cipherAlgorithm: String, - keyLength: Int, - iterations: Int) extends Logging { +class EncryptingPasswordEncoder( + secret: Password, + keyFactoryAlgorithm: Option[String], + cipherAlgorithm: String, + keyLength: Int, + iterations: Int +) extends PasswordEncoder with Logging { private val secureRandom = new SecureRandom private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm) - def encode(password: Password): String = { + override def encode(password: Password): String = { val salt = new Array[Byte](256) secureRandom.nextBytes(salt) val cipher = Cipher.getInstance(cipherAlgorithm) @@ -84,7 +113,7 @@ class PasswordEncoder(secret: Password, encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",") } - def decode(encodedPassword: String): Password = { + override def decode(encodedPassword: String): Password = { val params = CoreUtils.parseCsvMap(encodedPassword) val keyFactoryAlg = params(KeyFactoryAlgorithmProp) val cipherAlg = params(CipherAlgorithmProp) @@ -131,8 +160,6 @@ class PasswordEncoder(secret: Password, private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes) - private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded) - private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = { val aesPattern = "AES/(.*)/.*".r cipherAlgorithm match { diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index ccfe63e7b5..c7be8ce831 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -31,7 +31,7 @@ import com.yammer.metrics.core.MetricName import kafka.admin.ConfigCommand import kafka.api.{KafkaSasl, SaslSetup} import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager} -import kafka.log.{CleanerConfig, LogConfig} +import kafka.log.{CleanerConfig, LogConfig, UnifiedLog} import kafka.message.ProducerCompressionCodec import kafka.network.{Processor, RequestChannel} import kafka.utils._ @@ -64,6 +64,8 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.annotation.nowarn import scala.collection._ @@ -80,7 +82,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup import DynamicBrokerReconfigurationTest._ - private val servers = new ArrayBuffer[KafkaServer] + private val servers = new ArrayBuffer[KafkaBroker] private val numServers = 3 private val numPartitions = 10 private val producers = new ArrayBuffer[KafkaProducer[String, String]] @@ -111,15 +113,22 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup (0 until numServers).foreach { brokerId => - val props = TestUtils.createBrokerConfig(brokerId, zkConnect) + val props = if (isKRaftTest()) { + val properties = TestUtils.createBrokerConfig(brokerId, null) + properties.put(KafkaConfig.AdvertisedListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0") + properties + } else { + val properties = TestUtils.createBrokerConfig(brokerId, zkConnect) + properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true") + properties + } props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS) // Ensure that we can support multiple listeners per security protocol and multiple security protocols props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0") - props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol") props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal) props.put(KafkaConfig.SslClientAuthProp, "requested") props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN") - props.put(KafkaConfig.ZkEnableSecureAclsProp, "true") props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(",")) props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads @@ -138,17 +147,21 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal)) val kafkaConfig = KafkaConfig.fromProps(props) - configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1) + if (!isKRaftTest()) { + configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1) + } - servers += TestUtils.createServer(kafkaConfig) + servers += createBroker(kafkaConfig) } - TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers) - TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions, - replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs) - createAdminClient(SecurityProtocol.SSL, SecureInternal) + TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, numPartitions, replicationFactor = numServers) + TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, + numPartitions = servers.head.config.offsetsTopicPartitions, + replicationFactor = numServers, + topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs) + TestMetricsReporter.testReporters.clear() } @@ -166,8 +179,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup closeSasl() } - @Test - def testConfigDescribeUsingAdminClient(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testConfigDescribeUsingAdminClient(quorum: String): Unit = { def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, isReadOnly: Boolean, expectedProps: Properties): Unit = { @@ -226,9 +240,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val adminClient = adminClients.head alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal) - val configDesc = describeConfig(adminClient) - verifySslConfig("listener.name.external.", sslProperties1, configDesc) - verifySslConfig("", invalidSslProperties, configDesc) + val configDesc = TestUtils.tryUntilNoAssertionError() { + val describeConfigsResult = describeConfig(adminClient) + verifySslConfig("listener.name.external.", sslProperties1, describeConfigsResult) + verifySslConfig("", invalidSslProperties, describeConfigsResult) + describeConfigsResult + } // Verify a few log configs with and without synonyms val expectedProps = new Properties @@ -262,8 +279,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertEquals(List((KafkaConfig.LogCleanerThreadsProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads)) } - @Test - def testUpdatesUsingConfigProvider(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testUpdatesUsingConfigProvider(quorum: String): Unit = { val PollingIntervalVal = f"$${file:polling.interval:interval}" val PollingIntervalUpdateVal = f"$${file:polling.interval:updinterval}" val SslTruststoreTypeVal = f"$${file:ssl.truststore.type:storetype}" @@ -309,11 +327,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found") } - // fetch from ZK, values should be unresolved - val props = fetchBrokerConfigsFromZooKeeper(servers.head) - assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK") - assertTrue(props.getProperty(configPrefix+KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK") - assertTrue(props.getProperty(configPrefix+KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK") + if (!isKRaftTest()) { + // fetch from ZK, values should be unresolved + val props = fetchBrokerConfigsFromZooKeeper(servers.head) + assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK") + assertTrue(props.getProperty(configPrefix + KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK") + assertTrue(props.getProperty(configPrefix + KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK") + } // verify the update // 1. verify update not occurring if the value of property is same. @@ -332,10 +352,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } } - @Test + @Test // TODO KAFKA-14126 add KRaft support def testKeyStoreAlter(): Unit = { val topic2 = "testtopic2" - TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers) + TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers) // Start a producer and consumer that work with the current broker keystore. // This should continue working while changes are made @@ -399,7 +419,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup stopAndVerifyProduceConsume(producerThread, consumerThread) } - @Test + @Test // TODO KAFKA-14126 add KRaft support def testTrustStoreAlter(): Unit = { val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL) @@ -481,7 +501,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup verifySslProduceConsume(sslProperties2, "alter-truststore-7") waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1)) - val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get + val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer] val controllerChannelManager = controller.kafkaController.controllerChannelManager val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] = JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo") @@ -492,8 +512,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup verifyBrokerToControllerCall(controller) } - @Test - def testLogCleanerConfig(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testLogCleanerConfig(quorum: String): Unit = { val (producerThread, consumerThread) = startProduceConsume(retries = 0) verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1) @@ -537,13 +558,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup stopAndVerifyProduceConsume(producerThread, consumerThread) } - @Test - def testConsecutiveConfigChange(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testConsecutiveConfigChange(quorum: String): Unit = { val topic2 = "testtopic2" val topicProps = new Properties topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2") - TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers, servers, topicProps) - var log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found")) + TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps) + + def getLogOrThrow(tp: TopicPartition): UnifiedLog = { + var (logOpt, found) = TestUtils.computeUntilTrue { + servers.head.logManager.getLog(tp) + }(_.isDefined) + assertTrue(found, "Log not found") + logOpt.get + } + + var log = getLogOrThrow(new TopicPartition(topic2, 0)) assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp)) assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) @@ -558,7 +589,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } } - log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found")) + log = getLogOrThrow(new TopicPartition(topic2, 0)) assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp)) assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config survives @@ -566,7 +597,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.clear() props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000") reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogRetentionTimeMillisProp, "604800000")) - log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found")) + log = getLogOrThrow(new TopicPartition(topic2, 0)) assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp)) assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config still survives } @@ -974,6 +1005,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } @Test + // Modifying advertised listeners is not supported in KRaft def testAdvertisedListenerUpdate(): Unit = { val adminClient = adminClients.head val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal) @@ -994,11 +1026,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } // Verify that endpoints have been updated in ZK for all brokers - servers.foreach(validateEndpointsInZooKeeper(_, endpoints => endpoints.contains(invalidHost))) + servers.foreach { server => + validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => endpoints.contains(invalidHost)) + } // Trigger session expiry and ensure that controller registers new advertised listener after expiry val controllerEpoch = zkClient.getControllerEpoch - val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller"))) + val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller"))).asInstanceOf[KafkaServer] val controllerZkClient = controllerServer.zkClient val sessionExpiringClient = createZooKeeperClientToTriggerSessionExpiry(controllerZkClient.currentZooKeeper) sessionExpiringClient.close() @@ -1022,7 +1056,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup .getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException]) alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost") - servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost))) + servers.foreach { server => + validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => !endpoints.contains(invalidHost)) + } // Verify that produce/consume work now val topic2 = "testtopic2" @@ -1119,7 +1155,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertTrue(partitions.exists(_.leader == null), "Did not find partitions with no leader") } - private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol, + private def addListener(servers: Seq[KafkaBroker], listenerName: String, securityProtocol: SecurityProtocol, saslMechanisms: Seq[String]): Unit = { val config = servers.head.config val existingListenerCount = config.listeners.size @@ -1264,11 +1300,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup verifyProduceConsume(producer, consumer, numRecords = 10, topic) } - private def hasListenerMetric(server: KafkaServer, listenerName: String): Boolean = { + private def hasListenerMetric(server: KafkaBroker, listenerName: String): Boolean = { server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener") == listenerName) } - private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = { + private def fetchBrokerConfigsFromZooKeeper(server: KafkaBroker): Properties = { val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString) server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true) } @@ -1322,7 +1358,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup }, "Did not fail authentication with invalid config") } - private def describeConfig(adminClient: Admin, servers: Seq[KafkaServer] = this.servers): Config = { + private def describeConfig(adminClient: Admin, servers: Seq[KafkaBroker] = this.servers): Config = { val configResources = servers.map { server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) } @@ -1419,7 +1455,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } @nowarn("cat=deprecation") - private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = { + private def alterConfigsOnServer(server: KafkaBroker, props: Properties): Unit = { val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava val newConfig = new Config(configEntries) val configs = Map(new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) -> newConfig).asJava @@ -1428,7 +1464,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } @nowarn("cat=deprecation") - private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties, + private def alterConfigs(servers: Seq[KafkaBroker], adminClient: Admin, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = { val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava val newConfig = new Config(configEntries) @@ -1507,7 +1543,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup private def createPasswordEncoder(config: KafkaConfig, secret: Option[Password]): PasswordEncoder = { val encoderSecret = secret.getOrElse(throw new IllegalStateException("Password encoder secret not configured")) - new PasswordEncoder(encoderSecret, + PasswordEncoder.encrypting(encoderSecret, config.passwordEncoderKeyFactoryAlgorithm, config.passwordEncoderCipherAlgorithm, config.passwordEncoderKeyLength, @@ -1518,7 +1554,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) } } - private def waitForConfigOnServer(server: KafkaServer, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = { + private def waitForConfigOnServer(server: KafkaBroker, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = { TestUtils.retry(maxWaitMs) { assertEquals(propValue, server.config.originals.get(propName)) } diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala index 0a5d5ac029..50cdceabbc 100755 --- a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala +++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala @@ -30,7 +30,7 @@ class PasswordEncoderTest { @Test def testEncodeDecode(): Unit = { - val encoder = new PasswordEncoder(new Password("password-encoder-secret"), + val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), None, Defaults.PasswordEncoderCipherAlgorithm, Defaults.PasswordEncoderKeyLength, @@ -54,7 +54,7 @@ class PasswordEncoderTest { @Test def testEncoderConfigChange(): Unit = { - val encoder = new PasswordEncoder(new Password("password-encoder-secret"), + val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), Some("PBKDF2WithHmacSHA1"), "DES/CBC/PKCS5Padding", 64, @@ -68,7 +68,7 @@ class PasswordEncoderTest { assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp)) // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered - val decoder = new PasswordEncoder(new Password("password-encoder-secret"), + val decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), Some("PBKDF2WithHmacSHA1"), "AES/CBC/PKCS5Padding", 128, @@ -76,7 +76,7 @@ class PasswordEncoderTest { assertEquals(password, decoder.decode(encoded).value) // Test that decoding fails if secret is altered - val decoder2 = new PasswordEncoder(new Password("secret-2"), + val decoder2 = PasswordEncoder.encrypting(new Password("secret-2"), Some("PBKDF2WithHmacSHA1"), "AES/CBC/PKCS5Padding", 128, @@ -92,7 +92,7 @@ class PasswordEncoderTest { def testEncodeDecodeAlgorithms(): Unit = { def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = { - val encoder = new PasswordEncoder(new Password("password-encoder-secret"), + val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), keyFactoryAlg, cipherAlg, keyLength, diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c49a7bdde0..d0266bdee9 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -511,7 +511,7 @@ object TestUtils extends Logging { topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, - servers: Seq[KafkaServer], + servers: Seq[KafkaBroker], topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = { val adminZkClient = new AdminZkClient(zkClient) // create topic @@ -543,7 +543,7 @@ object TestUtils extends Logging { def createTopic(zkClient: KafkaZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], - servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] = { + servers: Seq[KafkaBroker]): scala.collection.immutable.Map[Int, Int] = { createTopic(zkClient, topic, partitionReplicaAssignment, servers, new Properties()) } @@ -555,7 +555,7 @@ object TestUtils extends Logging { def createTopic(zkClient: KafkaZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], - servers: Seq[KafkaServer], + servers: Seq[KafkaBroker], topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = { val adminZkClient = new AdminZkClient(zkClient) // create topic @@ -583,7 +583,7 @@ object TestUtils extends Logging { * Create the consumer offsets/group metadata topic and wait until the leader is elected and metadata is propagated * to all brokers. */ - def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaServer]): Unit = { + def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = { val server = servers.head createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp), @@ -1043,18 +1043,19 @@ object TestUtils extends Logging { * otherwise difficult to poll for. `computeUntilTrue` and `waitUntilTrue` should be preferred in cases where we can * easily wait on a condition before evaluating the assertions. */ - def tryUntilNoAssertionError(waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => Unit) = { - val (error, success) = TestUtils.computeUntilTrue({ + def tryUntilNoAssertionError[T](waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => T): T = { + val (either, success) = TestUtils.computeUntilTrue({ try { - assertions - None + val res = assertions + Left(res) } catch { - case ae: AssertionError => Some(ae) + case ae: AssertionError => Right(ae) } - }, waitTime = waitTime, pause = pause)(_.isEmpty) + }, waitTime = waitTime, pause = pause)(_.isLeft) - if (!success) { - throw error.get + either match { + case Left(res) => res + case Right(err) => throw err } }