m1a2st commented on code in PR #18269:
URL: https://github.com/apache/kafka/pull/18269#discussion_r1892383292


##########
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##########
@@ -455,54 +455,52 @@ class DynamicBrokerConfigTest {
   }
 
   @Test
-  def testPasswordConfigEncryption(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+  def testPasswordConfigNotEncryption(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     val configWithoutSecret = KafkaConfig(props)
     props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"config-encoder-secret")
     val configWithSecret = KafkaConfig(props)
     val dynamicProps = new Properties
-    dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, "myLoginModule required;")
+    val password = "myLoginModule required;"
+    dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password)
 
     try {
       configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, 
perBrokerConfig = true)
     } catch {
       case _: ConfigException => // expected exception
     }
     val persistedProps = 
configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig 
= true)
-    
assertFalse(persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG).contains("myLoginModule"),
-      "Password not encoded")
-    val decodedProps = 
configWithSecret.dynamicConfig.fromPersistentProps(persistedProps, 
perBrokerConfig = true)
-    assertEquals("myLoginModule required;", 
decodedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
+    assertEquals(password, 
persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))

Review Comment:
   In Kraft mode, `PasswordEncoder` doesn't encrypt the password, see 
   
https://github.com/apache/kafka/blob/22d1ba8265d44738ff81a5031d523a009237aa46/server-common/src/main/java/org/apache/kafka/security/PasswordEncoder.java#L36



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -258,31 +258,6 @@ class KafkaConfigTest {
     assertTrue(isValidKafkaConfig(props))
   }
 
-  @Test

Review Comment:
   Kraft mode doesn't support `control.plane.listener.name` config, so I remove 
this test



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1530,56 +1502,10 @@ class KafkaConfigTest {
     // so pick a broker ID greater than reserved.broker.max.id, which defaults 
to 1000,
     // and make sure it is not allowed with broker.id.generation.enable=true 
(true is the default)
     val largeBrokerId = 2000
-    val props = TestUtils.createBrokerConfig(largeBrokerId, 
TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
-    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners)
-    assertFalse(isValidKafkaConfig(props))
-  }
-
-  @Test
-  def testAcceptsNegativeOneNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
-    // -1 is the default for both node.id and broker.id; it implies 
"auto-generate" and should succeed
-    val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
+    val props = TestUtils.createBrokerConfig(largeBrokerId, null, port = 
TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
     props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners)
-    KafkaConfig.fromProps(props)
-  }
-
-  @Test
-  def testRejectsNegativeTwoNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
-    // -1 implies "auto-generate" and should succeed, but -2 does not and 
should fail
-    val negativeTwoNodeId = -2
-    val props = TestUtils.createBrokerConfig(negativeTwoNodeId, 
TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
-    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners)
-    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, negativeTwoNodeId.toString)
-    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, 
negativeTwoNodeId.toString)
-    assertFalse(isValidKafkaConfig(props))
-  }
-
-  @Test
-  def testAcceptsLargeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
-    // Ensure a broker ID greater than reserved.broker.max.id, which defaults 
to 1000,
-    // is allowed with broker.id.generation.enable=false
-    val largeBrokerId = 2000
-    val props = TestUtils.createBrokerConfig(largeBrokerId, 
TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
-    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners)
-    props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, 
"false")
-    KafkaConfig.fromProps(props)
-  }
-
-  @Test
-  def testRejectsNegativeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
-    // -1 is the default for both node.id and broker.id
-    val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
-    props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, 
"false")

Review Comment:
   remove zk test



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -739,106 +714,107 @@ class KafkaConfigTest {
 
   @Test
   def testLogRollTimeNoConfigProvided(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis             
                                                                                
                                                                                
                        )
   }
 
   @Test
   def testDefaultCompressionType(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     val serverConfig = KafkaConfig.fromProps(props)
     assertEquals(serverConfig.compressionType, "producer")
   }
 
   @Test
   def testValidCompressionType(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.setProperty("compression.type", "gzip")
     val serverConfig = KafkaConfig.fromProps(props)
     assertEquals(serverConfig.compressionType, "gzip")
   }
 
   @Test
   def testInvalidCompressionType(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "abc")
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
   def testInvalidGzipCompressionLevel(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "gzip")
     props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, 
(CompressionType.GZIP.maxLevel() + 1).toString)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
   def testInvalidLz4CompressionLevel(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "lz4")
     props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, 
(CompressionType.LZ4.maxLevel() + 1).toString)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
   def testInvalidZstdCompressionLevel(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "zstd")
     props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, 
(CompressionType.ZSTD.maxLevel() + 1).toString)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
   def testInvalidInterBrokerSecurityProtocol(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"SSL://localhost:0")
     
props.setProperty(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.PLAINTEXT.toString)
     assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
   }
 
   @Test
   def testEqualAdvertisedListenersProtocol(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
-    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"PLAINTEXT://localhost:9092")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testInvalidAdvertisedListenersProtocol(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"TRACE://localhost:9091,SSL://localhost:9093")
     props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"PLAINTEXT://localhost:9092")
     assertBadConfigContainingMessage(props, "No security protocol defined for 
listener TRACE")
 
-    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
     assertBadConfigContainingMessage(props, "advertised.listeners listener 
names must be equal to or a subset of the ones defined in listeners")
   }
 
   @nowarn("cat=deprecation")
   @Test
   def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
     def buildConfig(interBrokerProtocol: MetadataVersion, messageFormat: 
MetadataVersion): KafkaConfig = {
-      val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+      val props = TestUtils.createBrokerConfig(0, null, port = 8181)
       
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, 
interBrokerProtocol.version)
       props.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, 
messageFormat.version)
       KafkaConfig.fromProps(props)
     }
 
     MetadataVersion.VERSIONS.foreach { interBrokerVersion =>
       MetadataVersion.VERSIONS.foreach { messageFormatVersion =>
-        if (interBrokerVersion.highestSupportedRecordVersion.value >= 
messageFormatVersion.highestSupportedRecordVersion.value) {
-          val config = buildConfig(interBrokerVersion, messageFormatVersion)
-          assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
-          if (interBrokerVersion.isAtLeast(IBP_3_0_IV1))
-            assertEquals(IBP_3_0_IV1, config.logMessageFormatVersion)
-          else
-            assertEquals(messageFormatVersion, config.logMessageFormatVersion)
-        } else {
-          assertThrows(classOf[IllegalArgumentException], () => 
buildConfig(interBrokerVersion, messageFormatVersion))
+        if (interBrokerVersion.isKRaftSupported) {

Review Comment:
   Kraft mode only support min version `IBP_3_0_IV1`, thus we should add check



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1387,13 +1386,6 @@ class ReplicaManagerTest {
     verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(new 
Properties, expectTruncation = false)
   }
 
-  @Test
-  def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdateIbp26(): 
Unit = {

Review Comment:
   Kraft doesn't support `IBP_2_6_IV0`, remove it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to