showuon commented on code in PR #15246: URL: https://github.com/apache/kafka/pull/15246#discussion_r1465931069
########## server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.security; + +import javax.crypto.SecretKeyFactory; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.server.util.Csv; +import org.junit.jupiter.api.Test; + +import java.security.GeneralSecurityException; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.fail; + +class PasswordEncoderTest { + + @Test + public void testEncodeDecode() throws GeneralSecurityException { + PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), + Optional.empty(), + PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, + PasswordEncoderConfigs.DEFAULT_KEY_LENGTH, + PasswordEncoderConfigs.DEFAULT_ITERATIONS); + String password = "test-password"; + String encoded = encoder.encode(new Password(password)); + Map<String, String> encodedMap = Csv.parseCsvMap(encoded); + assertEquals("4096", encodedMap.get(PasswordEncoder.ITERATIONS)); + assertEquals("128", encodedMap.get(PasswordEncoder.KEY_LENGTH)); + String defaultKeyFactoryAlgorithm; + try { + SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512"); + defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA512"; + + } catch (Exception e) { + defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA1"; + } + assertEquals(defaultKeyFactoryAlgorithm, encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM)); + assertEquals("AES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM)); + verifyEncodedPassword(encoder, password, encoded); + } + + @Test + public void testEncoderConfigChange() throws GeneralSecurityException { + PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), + Optional.of("PBKDF2WithHmacSHA1"), + "DES/CBC/PKCS5Padding", + 64, + 1024); + String password = "test-password"; + String encoded = encoder.encode(new Password(password)); + Map<String, String> encodedMap = Csv.parseCsvMap(encoded); + assertEquals("1024", encodedMap.get(PasswordEncoder.ITERATIONS)); + assertEquals("64", encodedMap.get(PasswordEncoder.KEY_LENGTH)); + assertEquals("PBKDF2WithHmacSHA1", encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM)); + assertEquals("DES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM)); + + // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered + PasswordEncoder decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), + Optional.of("PBKDF2WithHmacSHA1"), + "AES/CBC/PKCS5Padding", + 128, + 2048); + assertEquals(password, decoder.decode(encoded).value()); + + // Test that decoding fails if secret is altered + PasswordEncoder decoder2 = PasswordEncoder.encrypting(new Password("secret-2"), + Optional.of("PBKDF2WithHmacSHA1"), + "AES/CBC/PKCS5Padding", + 128, + 1024); + try { + decoder2.decode(encoded); + fail("Expected ConfigException to be thrown"); + } catch (ConfigException expected) { + } Review Comment: Nit: Can be replaced with `assertThrows(ConfigException.class, () -> ...)` ########## core/src/main/scala/kafka/utils/PasswordEncoder.scala: ########## @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.utils - -import java.nio.charset.StandardCharsets -import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom} -import java.security.spec.AlgorithmParameterSpec -import java.util.Base64 - -import javax.crypto.{Cipher, SecretKeyFactory} -import javax.crypto.spec._ -import kafka.utils.PasswordEncoder._ -import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.config.types.Password - -import scala.collection.Map - -object PasswordEncoder { - val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm" - val CipherAlgorithmProp = "cipherAlgorithm" - val InitializationVectorProp = "initializationVector" - val KeyLengthProp = "keyLength" - val SaltProp = "salt" - val IterationsProp = "iterations" - val EncryptedPasswordProp = "encryptedPassword" - val PasswordLengthProp = "passwordLength" - - def encrypting(secret: Password, - keyFactoryAlgorithm: Option[String], - cipherAlgorithm: String, - keyLength: Int, - iterations: Int): EncryptingPasswordEncoder = { - new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations) - } - - def noop(): NoOpPasswordEncoder = { - new NoOpPasswordEncoder() - } -} - -trait PasswordEncoder { - def encode(password: Password): String - def decode(encodedPassword: String): Password - - private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded) -} - -/** - * A password encoder that does not modify the given password. This is used in KRaft mode only. - */ Review Comment: This class comment is missing. ########## core/src/main/scala/kafka/admin/ConfigCommand.scala: ########## @@ -211,19 +212,19 @@ object ConfigCommand extends Logging { } private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = { - encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp) - val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp, + encoderConfigs.get(PasswordEncoderConfigs.SECRET) + val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET, throw new IllegalArgumentException("Password encoder secret not specified")) PasswordEncoder.encrypting(new Password(encoderSecret), - None, - encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderCipherAlgorithmProp, Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM), - encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_KEY_LENGTH), - encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_ITERATIONS)) + Optional.empty(), + encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM), + encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH), + encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS)) } /** * Pre-process broker configs provided to convert them to persistent format. - * Password configs are encrypted using the secret `KafkaConfig.PasswordEncoderSecretProp`. + * Password configs are encrypted using the secret `PasswordEncoderConfigs.SECRET_PROP`. Review Comment: `PasswordEncoderConfigs.SECRET_PROP` -> `PasswordEncoderConfigs.SECRET` ? ########## core/src/main/scala/kafka/utils/PasswordEncoder.scala: ########## @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.utils - -import java.nio.charset.StandardCharsets -import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom} -import java.security.spec.AlgorithmParameterSpec -import java.util.Base64 - -import javax.crypto.{Cipher, SecretKeyFactory} -import javax.crypto.spec._ -import kafka.utils.PasswordEncoder._ -import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.config.types.Password - -import scala.collection.Map - -object PasswordEncoder { - val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm" - val CipherAlgorithmProp = "cipherAlgorithm" - val InitializationVectorProp = "initializationVector" - val KeyLengthProp = "keyLength" - val SaltProp = "salt" - val IterationsProp = "iterations" - val EncryptedPasswordProp = "encryptedPassword" - val PasswordLengthProp = "passwordLength" - - def encrypting(secret: Password, - keyFactoryAlgorithm: Option[String], - cipherAlgorithm: String, - keyLength: Int, - iterations: Int): EncryptingPasswordEncoder = { - new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations) - } - - def noop(): NoOpPasswordEncoder = { - new NoOpPasswordEncoder() - } -} - -trait PasswordEncoder { - def encode(password: Password): String - def decode(encodedPassword: String): Password - - private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded) -} - -/** - * A password encoder that does not modify the given password. This is used in KRaft mode only. - */ -class NoOpPasswordEncoder extends PasswordEncoder { - override def encode(password: Password): String = password.value() - override def decode(encodedPassword: String): Password = new Password(encodedPassword) -} - -/** - * Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map - * containing the encoded password in base64 and along with the properties used for encryption. - * - * @param secret The secret used for encoding and decoding - * @param keyFactoryAlgorithm Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is - * used if available, PBKDF2WithHmacSHA1 otherwise. - * @param cipherAlgorithm Cipher algorithm used for encoding. - * @param keyLength Key length used for encoding. This should be valid for the specified algorithms. - * @param iterations Iteration count used for encoding. - * - * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords. - * The values used for encoding are stored along with the encoded password and the stored values are used for decoding. - * - */ Review Comment: This is missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
