dajac commented on a change in pull request #11295:
URL: https://github.com/apache/kafka/pull/11295#discussion_r702694677



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -354,10 +354,10 @@ object KafkaConfig {
 
   // For ZooKeeper TLS client authentication to be enabled the client must (at 
a minimum) configure itself as using TLS
   // with both a client connection socket and a key store location explicitly 
set.
-  private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = {
-    getZooKeeperClientProperty(zkClientConfig, 
ZkSslClientEnableProp).getOrElse("false") == "true" &&
-      getZooKeeperClientProperty(zkClientConfig, 
ZkClientCnxnSocketProp).isDefined &&
-      getZooKeeperClientProperty(zkClientConfig, 
ZkSslKeyStoreLocationProp).isDefined
+  private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): 
Boolean = {
+    zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).map(_ == 
"true").getOrElse(false) &&

Review comment:
       nit: We could use `exists` here.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -95,24 +95,24 @@ object AclAuthorizer {
     }
   }
 
-  private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: 
KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = {
+  private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: 
KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
     val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + 
KafkaConfig.ZkSslClientEnableProp).
       
map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
     if (!zkSslClientEnable)
-      None
+      new ZKClientConfig
     else {
       // start with the base config from the Kafka configuration
       // be sure to force creation since the zkSslClientEnable property in the 
kafkaConfig could be false
       val zkClientConfig = 
KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true)
       // add in any prefixed overlays
-      KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, 
sysProp) => {
-        val prefixedValue = configMap.get(AclAuthorizer.configPrefix + 
kafkaProp)
-        if (prefixedValue.isDefined)
-          zkClientConfig.get.setProperty(sysProp,
+      KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, 
sysProp) => {

Review comment:
       nit: We could remove the extra block defined by the last `{` on this 
line.

##########
File path: core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
##########
@@ -1340,6 +1343,32 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, 
zkClient.getAcl(mockPath))
   }
 
+  @Test
+  def testJuteMaxBufffer(): Unit = {
+    // default case
+    assertEquals("4194304", 
zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+
+    // Value set directly on ZKClientConfig takes precedence over system 
property
+    System.setProperty(ZKConfig.JUTE_MAXBUFFER, (3000 * 1024).toString)
+    try {
+      val clientConfig1 = new ZKClientConfig
+      clientConfig1.setProperty(ZKConfig.JUTE_MAXBUFFER, (2000 * 
1024).toString)
+      val client1 = KafkaZkClient(zkConnect, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
+        zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = 
"KafkaZkClient",
+        zkClientConfig = clientConfig1)
+      try assertEquals("2048000", 
client1.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+      finally client1.close()

Review comment:
       Is it worth defining an inner helper method to avoid the repeated code 
here? Something like: `assertPropery(config: ZKClientConfig, property: String, 
expectedValue: String)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to