[kafka] branch trunk updated: KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft (#13368)

2023-03-27 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new f1b3732fa64 KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft 
(#13368)
f1b3732fa64 is described below

commit f1b3732fa64372327377834954561d2e63e7d2ce
Author: David Arthur 
AuthorDate: Mon Mar 27 19:12:02 2023 -0400

KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft (#13368)

This patch refactors the loadCache method in AclAuthorizer to make it 
reusable by ZkMigrationClient.
The loaded ACLs are converted to AccessControlEntryRecord. I noticed we 
still have the defunct
AccessControlRecord, so I've deleted it.

Also included here are the methods to write ACL changes back to ZK while in 
dual-write mode.

Reviewers: Rajini Sivaram , Colin P. McCabe 

---
 checkstyle/import-control.xml  |   2 +
 .../kafka/security/authorizer/AclAuthorizer.scala  |  53 +
 .../main/scala/kafka/zk/ZkMigrationClient.scala| 128 -
 .../kafka/zk/ZkMigrationIntegrationTest.scala  |  59 +-
 .../unit/kafka/zk/ZkMigrationClientTest.scala  | 101 +++-
 .../metadata/migration/KRaftMigrationDriver.java   |  46 +++-
 .../kafka/metadata/migration/MigrationClient.java  |  17 +++
 .../common/metadata/AccessControlRecord.json   |  38 --
 .../migration/KRaftMigrationDriverTest.java|  26 +
 9 files changed, 400 insertions(+), 70 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 373f15aebc5..b7a55201106 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -268,12 +268,14 @@
 
   
 
+
 
 
 
 
 
 
+
 
 
 
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala 
b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 1de9a27402c..ce3e2bd0de3 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -124,6 +124,33 @@ object AclAuthorizer {
 if (aclBinding.pattern().name().contains("/"))
   throw new IllegalArgumentException(s"ACL binding contains invalid 
resource name: ${aclBinding.pattern().name()}")
   }
+
+  def loadAllAcls(
+zkClient: KafkaZkClient,
+logger: Logging,
+aclConsumer: (ResourcePattern, VersionedAcls) => Unit
+  ): Unit = {
+ZkAclStore.stores.foreach { store =>
+  val resourceTypes = zkClient.getResourceTypes(store.patternType)
+  for (rType <- resourceTypes) {
+val resourceType = Try(SecurityUtils.resourceType(rType))
+resourceType match {
+  case Success(resourceTypeObj) =>
+val resourceNames = zkClient.getResourceNames(store.patternType, 
resourceTypeObj)
+for (resourceName <- resourceNames) {
+  val resource = new ResourcePattern(resourceTypeObj, 
resourceName, store.patternType)
+  val versionedAcls = getAclsFromZk(zkClient, resource)
+  aclConsumer.apply(resource, versionedAcls)
+}
+  case Failure(_) => logger.warn(s"Ignoring unknown ResourceType: 
$rType")
+}
+  }
+}
+  }
+
+  def getAclsFromZk(zkClient: KafkaZkClient, resource: ResourcePattern): 
VersionedAcls = {
+zkClient.getVersionedAclsForResource(resource)
+  }
 }
 
 class AclAuthorizer extends Authorizer with Logging {
@@ -549,22 +576,7 @@ class AclAuthorizer extends Authorizer with Logging {
 
   private def loadCache(): Unit = {
 lock synchronized  {
-  ZkAclStore.stores.foreach { store =>
-val resourceTypes = zkClient.getResourceTypes(store.patternType)
-for (rType <- resourceTypes) {
-  val resourceType = Try(SecurityUtils.resourceType(rType))
-  resourceType match {
-case Success(resourceTypeObj) =>
-  val resourceNames = zkClient.getResourceNames(store.patternType, 
resourceTypeObj)
-  for (resourceName <- resourceNames) {
-val resource = new ResourcePattern(resourceTypeObj, 
resourceName, store.patternType)
-val versionedAcls = getAclsFromZk(resource)
-updateCache(resource, versionedAcls)
-  }
-case Failure(_) => warn(s"Ignoring unknown ResourceType: $rType")
-  }
-}
-  }
+  loadAllAcls(zkClient, this, updateCache)
 }
   }
 
@@ -634,7 +646,7 @@ class AclAuthorizer extends Authorizer with Logging {
   if (aclCache.contains(resource))
 getAclsFromCache(resource)
   else
-getAclsFromZk(resource)
+getAclsFromZk(zkClient, resource)
 var newVersionedAcls: VersionedAcls = null
 var writeComplete = false
 var retries = 0
@@ -654,7 +666,7 @@ class AclAuthorizer 

[kafka] branch trunk updated (7438f100cf4 -> 31440b00f3e)

2023-03-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 7438f100cf4 KAFKA-14774 the removed listeners should not be 
reconfigurable (#13326)
 add 31440b00f3e KAFKA-14848: KafkaConsumer incorrectly passes 
locally-scoped serializers to FetchConfig (#13452)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/consumer/KafkaConsumer.java  |  5 +-
 .../clients/consumer/internals/FetchConfig.java| 10 ++-
 .../consumer/internals/FetchConfigTest.java| 92 ++
 3 files changed, 102 insertions(+), 5 deletions(-)
 create mode 100644 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchConfigTest.java



[kafka] branch trunk updated: KAFKA-14774 the removed listeners should not be reconfigurable (#13326)

2023-03-27 Thread chia7712
This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 7438f100cf4 KAFKA-14774 the removed listeners should not be 
reconfigurable (#13326)
7438f100cf4 is described below

commit 7438f100cf409a1ca178b15b6b7bcfd99e541098
Author: Chia-Ping Tsai 
AuthorDate: Mon Mar 27 18:48:31 2023 +0800

KAFKA-14774 the removed listeners should not be reconfigurable (#13326)

Reviewers: Mickael Maison 
---
 .../main/scala/kafka/network/SocketServer.scala|  1 +
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 34 --
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index feca5fc68b4..cdc8ece103c 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -352,6 +352,7 @@ class SocketServer(val config: KafkaConfig,
   dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
 acceptor.beginShutdown()
 acceptor.close()
+config.removeReconfigurable(acceptor)
   }
 }
   }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 940580d155b..7c6d5284d71 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -211,7 +211,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 
   // Use COWArrayList to prevent concurrent modification exception when an 
item is added by one thread to these
   // collections, while another thread is iterating over them.
-  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+  private[server] val reconfigurables = new 
CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = new 
CopyOnWriteArrayList[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig: KafkaConfig = _
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 661295e280c..b08c5360d96 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -32,7 +32,7 @@ import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
 import kafka.log.UnifiedLog
-import kafka.network.{Processor, RequestChannel}
+import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
@@ -77,6 +77,7 @@ import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
 object DynamicBrokerReconfigurationTest {
+  val Plain = "PLAIN"
   val SecureInternal = "INTERNAL"
   val SecureExternal = "EXTERNAL"
 }
@@ -125,7 +126,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
   props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
   // Ensure that we can support multiple listeners per security protocol 
and multiple security protocols
   props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, 
$SecureExternal://localhost:0")
-  props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, 
CONTROLLER:$controllerListenerSecurityProtocol")
+  props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, 
CONTROLLER:$controllerListenerSecurityProtocol")
   props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
   props.put(KafkaConfig.SslClientAuthProp, "requested")
   props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
@@ -1167,6 +1168,35 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 assertTrue(partitions.exists(_.leader == null), "Did not find partitions 
with no leader")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testReconfigureRemovedListener(quorum: String): Unit = {
+val client = adminClients.head
+val broker = servers.head
+assertEquals(2, 
broker.config.dynamicConfig.reconfigurables.asScala.count(r => 
r.isInstanceOf[DataPlaneAcceptor]))
+val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, 

[kafka] branch trunk updated (e4af074b4c6 -> 139f7709bd3)

2023-03-27 Thread viktor
This is an automated email from the ASF dual-hosted git repository.

viktor pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from e4af074b4c6 MINOR: doc: fix typo in config-streams (#13450)
 add 139f7709bd3 Fix log DateTime format unit test (#13441)

No new revisions were added by this update.

Summary of changes:
 .../src/test/java/org/apache/kafka/common/utils/UtilsTest.java | 10 --
 1 file changed, 4 insertions(+), 6 deletions(-)