m1a2st commented on code in PR #18269:
URL: https://github.com/apache/kafka/pull/18269#discussion_r1892383292
##########
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##########
@@ -455,54 +455,52 @@ class DynamicBrokerConfigTest {
}
@Test
- def testPasswordConfigEncryption(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ def testPasswordConfigNotEncryption(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
val configWithoutSecret = KafkaConfig(props)
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG,
"config-encoder-secret")
val configWithSecret = KafkaConfig(props)
val dynamicProps = new Properties
- dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, "myLoginModule required;")
+ val password = "myLoginModule required;"
+ dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password)
try {
configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps,
perBrokerConfig = true)
} catch {
case _: ConfigException => // expected exception
}
val persistedProps =
configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig
= true)
-
assertFalse(persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG).contains("myLoginModule"),
- "Password not encoded")
- val decodedProps =
configWithSecret.dynamicConfig.fromPersistentProps(persistedProps,
perBrokerConfig = true)
- assertEquals("myLoginModule required;",
decodedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
+ assertEquals(password,
persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
Review Comment:
In Kraft mode, `PasswordEncoder` doesn't encrypt the password, see
https://github.com/apache/kafka/blob/22d1ba8265d44738ff81a5031d523a009237aa46/server-common/src/main/java/org/apache/kafka/security/PasswordEncoder.java#L36
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -258,31 +258,6 @@ class KafkaConfigTest {
assertTrue(isValidKafkaConfig(props))
}
- @Test
Review Comment:
Kraft mode doesn't support `control.plane.listener.name` config, so I remove
this test
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1530,56 +1502,10 @@ class KafkaConfigTest {
// so pick a broker ID greater than reserved.broker.max.id, which defaults
to 1000,
// and make sure it is not allowed with broker.id.generation.enable=true
(true is the default)
val largeBrokerId = 2000
- val props = TestUtils.createBrokerConfig(largeBrokerId,
TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
- val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
- props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
listeners)
- assertFalse(isValidKafkaConfig(props))
- }
-
- @Test
- def testAcceptsNegativeOneNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
- // -1 is the default for both node.id and broker.id; it implies
"auto-generate" and should succeed
- val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port
= TestUtils.MockZkPort)
+ val props = TestUtils.createBrokerConfig(largeBrokerId, null, port =
TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
listeners)
- KafkaConfig.fromProps(props)
- }
-
- @Test
- def testRejectsNegativeTwoNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
- // -1 implies "auto-generate" and should succeed, but -2 does not and
should fail
- val negativeTwoNodeId = -2
- val props = TestUtils.createBrokerConfig(negativeTwoNodeId,
TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
- val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
- props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
listeners)
- props.setProperty(KRaftConfigs.NODE_ID_CONFIG, negativeTwoNodeId.toString)
- props.setProperty(ServerConfigs.BROKER_ID_CONFIG,
negativeTwoNodeId.toString)
- assertFalse(isValidKafkaConfig(props))
- }
-
- @Test
- def testAcceptsLargeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
- // Ensure a broker ID greater than reserved.broker.max.id, which defaults
to 1000,
- // is allowed with broker.id.generation.enable=false
- val largeBrokerId = 2000
- val props = TestUtils.createBrokerConfig(largeBrokerId,
TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
- val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
- props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
listeners)
- props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG,
"false")
- KafkaConfig.fromProps(props)
- }
-
- @Test
- def testRejectsNegativeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
- // -1 is the default for both node.id and broker.id
- val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port
= TestUtils.MockZkPort)
- val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
- props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG,
"false")
Review Comment:
remove zk test
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -739,106 +714,107 @@ class KafkaConfigTest {
@Test
def testLogRollTimeNoConfigProvided(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
val cfg = KafkaConfig.fromProps(props)
assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis
)
}
@Test
def testDefaultCompressionType(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.compressionType, "producer")
}
@Test
def testValidCompressionType(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.setProperty("compression.type", "gzip")
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.compressionType, "gzip")
}
@Test
def testInvalidCompressionType(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "abc")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test
def testInvalidGzipCompressionLevel(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "gzip")
props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG,
(CompressionType.GZIP.maxLevel() + 1).toString)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test
def testInvalidLz4CompressionLevel(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "lz4")
props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG,
(CompressionType.LZ4.maxLevel() + 1).toString)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test
def testInvalidZstdCompressionLevel(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "zstd")
props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG,
(CompressionType.ZSTD.maxLevel() + 1).toString)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test
def testInvalidInterBrokerSecurityProtocol(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"SSL://localhost:0")
props.setProperty(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.PLAINTEXT.toString)
assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props))
}
@Test
def testEqualAdvertisedListenersProtocol(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
- props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
+ props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"PLAINTEXT://localhost:9092")
+
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
KafkaConfig.fromProps(props)
}
@Test
def testInvalidAdvertisedListenersProtocol(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"TRACE://localhost:9091,SSL://localhost:9093")
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"PLAINTEXT://localhost:9092")
assertBadConfigContainingMessage(props, "No security protocol defined for
listener TRACE")
-
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
+
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
assertBadConfigContainingMessage(props, "advertised.listeners listener
names must be equal to or a subset of the ones defined in listeners")
}
@nowarn("cat=deprecation")
@Test
def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
def buildConfig(interBrokerProtocol: MetadataVersion, messageFormat:
MetadataVersion): KafkaConfig = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 8181)
+ val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
interBrokerProtocol.version)
props.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG,
messageFormat.version)
KafkaConfig.fromProps(props)
}
MetadataVersion.VERSIONS.foreach { interBrokerVersion =>
MetadataVersion.VERSIONS.foreach { messageFormatVersion =>
- if (interBrokerVersion.highestSupportedRecordVersion.value >=
messageFormatVersion.highestSupportedRecordVersion.value) {
- val config = buildConfig(interBrokerVersion, messageFormatVersion)
- assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
- if (interBrokerVersion.isAtLeast(IBP_3_0_IV1))
- assertEquals(IBP_3_0_IV1, config.logMessageFormatVersion)
- else
- assertEquals(messageFormatVersion, config.logMessageFormatVersion)
- } else {
- assertThrows(classOf[IllegalArgumentException], () =>
buildConfig(interBrokerVersion, messageFormatVersion))
+ if (interBrokerVersion.isKRaftSupported) {
Review Comment:
Kraft mode only support min version `IBP_3_0_IV1`, thus we should add check
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1387,13 +1386,6 @@ class ReplicaManagerTest {
verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(new
Properties, expectTruncation = false)
}
- @Test
- def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdateIbp26():
Unit = {
Review Comment:
Kraft doesn't support `IBP_2_6_IV0`, remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]