FrankYang0529 commented on code in PR #17424:
URL: https://github.com/apache/kafka/pull/17424#discussion_r1802529124


##########
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala:
##########
@@ -786,291 +641,17 @@ class AuthorizerTest extends QuorumTestHarness with 
BaseAuthorizerTest {
     }
     assertEquals(Set(acl3, acl4), 
deleteResults(0).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
     assertEquals(Set(acl1), 
deleteResults(1).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
-    if (quorum.equals(ZK)) {
-      assertEquals(Set.empty, 
deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
-    } else {
-      // standard authorizer first finds the acls that match filters and then 
delete them.
-      // So filters[2] will match acl3 even though it is also matching 
filters[0] and will be deleted by it
-      assertEquals(Set(acl3), 
deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
-    }
+    // standard authorizer first finds the acls that match filters and then 
delete them.
+    // So filters[2] will match acl3 even though it is also matching 
filters[0] and will be deleted by it
+    assertEquals(Set(acl3), 
deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
     assertEquals(Set.empty, 
deleteResults(3).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
   }
 
-  @Test
-  def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
-    givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV0))
-    val e = assertThrows(classOf[ApiException],
-      () => addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, 
"z_other", PREFIXED)))
-    assertTrue(e.getCause.isInstanceOf[UnsupportedVersionException], 
s"Unexpected exception $e")
-  }
-
-  @Test
-  def testCreateAclWithInvalidResourceName(): Unit = {
-    assertThrows(classOf[ApiException],
-      () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, 
"test/1", LITERAL)))
-  }
-
-  @Test
-  def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
-    givenAuthorizerWithProtocolVersion(Option.empty)
-    val resource = new ResourcePattern(TOPIC, "z_other", PREFIXED)
-    val expected = new String(ZkAclStore(PREFIXED).changeStore
-      .createChangeNode(resource).bytes, UTF_8)
-
-    addAcls(authorizer1, Set(denyReadAcl), resource)
-
-    val actual = getAclChangeEventAsString(PREFIXED)
-
-    assertEquals(expected, actual)
-  }
-
-  @Test
-  def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): 
Unit = {
-    givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV1))
-    val resource = new ResourcePattern(TOPIC, "z_other", PREFIXED)
-    val expected = new String(ZkAclStore(PREFIXED).changeStore
-      .createChangeNode(resource).bytes, UTF_8)
-
-    addAcls(authorizer1, Set(denyReadAcl), resource)
-
-    val actual = getAclChangeEventAsString(PREFIXED)
-
-    assertEquals(expected, actual)
-  }
-
-  @Test
-  def 
testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions():
 Unit = {
-    givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV0))
-    val resource = new ResourcePattern(TOPIC, "z_other", LITERAL)
-    val expected = new String(ZkAclStore(LITERAL).changeStore
-      .createChangeNode(resource).bytes, UTF_8)
-
-    addAcls(authorizer1, Set(denyReadAcl), resource)
-
-    val actual = getAclChangeEventAsString(LITERAL)
-
-    assertEquals(expected, actual)
-  }
-
-  @Test
-  def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit 
= {
-    givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV1))
-    val resource = new ResourcePattern(TOPIC, "z_other", LITERAL)
-    val expected = new String(ZkAclStore(LITERAL).changeStore
-      .createChangeNode(resource).bytes, UTF_8)
-
-    addAcls(authorizer1, Set(denyReadAcl), resource)
-
-    val actual = getAclChangeEventAsString(LITERAL)
-
-    assertEquals(expected, actual)
-  }
-
-  @Test
-  def testAuthorizerNoZkConfig(): Unit = {
-    val noTlsProps = Kafka.getPropsFromArgs(Array(prepareDefaultConfig))
-    val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
-      KafkaConfig.fromProps(noTlsProps),
-      noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala)
-    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { 
propName =>
-      assertNull(zkClientConfig.getProperty(propName))
-    }
-  }
-
-  @Test
-  def testAuthorizerZkConfigFromKafkaConfigWithDefaults(): Unit = {
-    val props = new java.util.Properties()
-    val kafkaValue = "kafkaValue"
-    val configs = Map("zookeeper.connect" -> "somewhere", // required, 
otherwise we would omit it
-      ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG -> "true",
-      ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG -> kafkaValue)
-    configs.foreach { case (key, value) => props.put(key, value) }
-
-    val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
-      KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
-    // confirm we get all the values we expect
-    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop 
=> prop match {
-      case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG | 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =>
-        assertEquals("true", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      case ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG | 
ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG =>
-        assertEquals("false", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      case ZkConfigs.ZK_SSL_PROTOCOL_CONFIG =>
-        assertEquals("TLSv1.2", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      case _ => assertEquals(kafkaValue, 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-    })
-  }
-
-  @Test
-  def testAuthorizerZkConfigFromKafkaConfig(): Unit = {
-    val props = new java.util.Properties()
-    val kafkaValue = "kafkaValue"
-    val configs = Map("zookeeper.connect" -> "somewhere", // required, 
otherwise we would omit it
-      ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG -> "true",
-      ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_PROTOCOL_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> "HTTPS",
-      ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG -> "false",
-      ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG -> "false")
-    configs.foreach{case (key, value) => props.put(key, value) }
-
-    val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
-      KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
-    // confirm we get all the values we expect
-    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop 
=> prop match {
-        case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG | 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =>
-          assertEquals("true", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-        case ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG | 
ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG =>
-          assertEquals("false", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-        case _ => assertEquals(kafkaValue, 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      })
-  }
-
-  @Test
-  def testAuthorizerZkConfigFromPrefixOverrides(): Unit = {
-    val props = new java.util.Properties()
-    val kafkaValue = "kafkaValue"
-    val prefixedValue = "prefixedValue"
-    val prefix = "authorizer."
-    val configs = Map("zookeeper.connect" -> "somewhere", // required, 
otherwise we would omit it
-      ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG -> "false",
-      ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_PROTOCOL_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG -> kafkaValue,
-      ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> "HTTPS",
-      ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG -> "false",
-      ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG -> "false",
-      prefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG -> "true",
-      prefix + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_PROTOCOL_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG -> prefixedValue,
-      prefix + ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> "",
-      prefix + ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG -> "true",
-      prefix + ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG -> "true")
-    configs.foreach{case (key, value) => props.put(key, value.toString) }
-
-    val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
-      KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
-    // confirm we get all the values we expect
-    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop 
=> prop match {
-      case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG | 
ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG | ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG =>
-        assertEquals("true", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =>
-        assertEquals("false", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      case _ => assertEquals(prefixedValue, 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-    })
-  }
-
-  @Test

Review Comment:
   This case overrides `AclAuthorizer#processAclChangeNotification`. IIRC, we 
don't have similar mechanism in `StandardAuthorizer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to