[kafka] branch trunk updated: KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft (#13368)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f1b3732fa64 KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft (#13368) f1b3732fa64 is described below commit f1b3732fa64372327377834954561d2e63e7d2ce Author: David Arthur AuthorDate: Mon Mar 27 19:12:02 2023 -0400 KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft (#13368) This patch refactors the loadCache method in AclAuthorizer to make it reusable by ZkMigrationClient. The loaded ACLs are converted to AccessControlEntryRecord. I noticed we still have the defunct AccessControlRecord, so I've deleted it. Also included here are the methods to write ACL changes back to ZK while in dual-write mode. Reviewers: Rajini Sivaram , Colin P. McCabe --- checkstyle/import-control.xml | 2 + .../kafka/security/authorizer/AclAuthorizer.scala | 53 + .../main/scala/kafka/zk/ZkMigrationClient.scala| 128 - .../kafka/zk/ZkMigrationIntegrationTest.scala | 59 +- .../unit/kafka/zk/ZkMigrationClientTest.scala | 101 +++- .../metadata/migration/KRaftMigrationDriver.java | 46 +++- .../kafka/metadata/migration/MigrationClient.java | 17 +++ .../common/metadata/AccessControlRecord.json | 38 -- .../migration/KRaftMigrationDriverTest.java| 26 + 9 files changed, 400 insertions(+), 70 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 373f15aebc5..b7a55201106 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -268,12 +268,14 @@ + + diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index 1de9a27402c..ce3e2bd0de3 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -124,6 +124,33 @@ object AclAuthorizer { if (aclBinding.pattern().name().contains("/")) throw new IllegalArgumentException(s"ACL binding contains invalid resource name: ${aclBinding.pattern().name()}") } + + def loadAllAcls( +zkClient: KafkaZkClient, +logger: Logging, +aclConsumer: (ResourcePattern, VersionedAcls) => Unit + ): Unit = { +ZkAclStore.stores.foreach { store => + val resourceTypes = zkClient.getResourceTypes(store.patternType) + for (rType <- resourceTypes) { +val resourceType = Try(SecurityUtils.resourceType(rType)) +resourceType match { + case Success(resourceTypeObj) => +val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj) +for (resourceName <- resourceNames) { + val resource = new ResourcePattern(resourceTypeObj, resourceName, store.patternType) + val versionedAcls = getAclsFromZk(zkClient, resource) + aclConsumer.apply(resource, versionedAcls) +} + case Failure(_) => logger.warn(s"Ignoring unknown ResourceType: $rType") +} + } +} + } + + def getAclsFromZk(zkClient: KafkaZkClient, resource: ResourcePattern): VersionedAcls = { +zkClient.getVersionedAclsForResource(resource) + } } class AclAuthorizer extends Authorizer with Logging { @@ -549,22 +576,7 @@ class AclAuthorizer extends Authorizer with Logging { private def loadCache(): Unit = { lock synchronized { - ZkAclStore.stores.foreach { store => -val resourceTypes = zkClient.getResourceTypes(store.patternType) -for (rType <- resourceTypes) { - val resourceType = Try(SecurityUtils.resourceType(rType)) - resourceType match { -case Success(resourceTypeObj) => - val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj) - for (resourceName <- resourceNames) { -val resource = new ResourcePattern(resourceTypeObj, resourceName, store.patternType) -val versionedAcls = getAclsFromZk(resource) -updateCache(resource, versionedAcls) - } -case Failure(_) => warn(s"Ignoring unknown ResourceType: $rType") - } -} - } + loadAllAcls(zkClient, this, updateCache) } } @@ -634,7 +646,7 @@ class AclAuthorizer extends Authorizer with Logging { if (aclCache.contains(resource)) getAclsFromCache(resource) else -getAclsFromZk(resource) +getAclsFromZk(zkClient, resource) var newVersionedAcls: VersionedAcls = null var writeComplete = false var retries = 0 @@ -654,7 +666,7 @@ class AclAuthorizer
[kafka] branch trunk updated (7438f100cf4 -> 31440b00f3e)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 7438f100cf4 KAFKA-14774 the removed listeners should not be reconfigurable (#13326) add 31440b00f3e KAFKA-14848: KafkaConsumer incorrectly passes locally-scoped serializers to FetchConfig (#13452) No new revisions were added by this update. Summary of changes: .../kafka/clients/consumer/KafkaConsumer.java | 5 +- .../clients/consumer/internals/FetchConfig.java| 10 ++- .../consumer/internals/FetchConfigTest.java| 92 ++ 3 files changed, 102 insertions(+), 5 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchConfigTest.java
[kafka] branch trunk updated: KAFKA-14774 the removed listeners should not be reconfigurable (#13326)
This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7438f100cf4 KAFKA-14774 the removed listeners should not be reconfigurable (#13326) 7438f100cf4 is described below commit 7438f100cf409a1ca178b15b6b7bcfd99e541098 Author: Chia-Ping Tsai AuthorDate: Mon Mar 27 18:48:31 2023 +0800 KAFKA-14774 the removed listeners should not be reconfigurable (#13326) Reviewers: Mickael Maison --- .../main/scala/kafka/network/SocketServer.scala| 1 + .../scala/kafka/server/DynamicBrokerConfig.scala | 2 +- .../server/DynamicBrokerReconfigurationTest.scala | 34 -- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index feca5fc68b4..cdc8ece103c 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -352,6 +352,7 @@ class SocketServer(val config: KafkaConfig, dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor => acceptor.beginShutdown() acceptor.close() +config.removeReconfigurable(acceptor) } } } diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 940580d155b..7c6d5284d71 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -211,7 +211,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these // collections, while another thread is iterating over them. - private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]() + private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]() private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock private var currentConfig: KafkaConfig = _ diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 661295e280c..b08c5360d96 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -32,7 +32,7 @@ import kafka.admin.ConfigCommand import kafka.api.{KafkaSasl, SaslSetup} import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager} import kafka.log.UnifiedLog -import kafka.network.{Processor, RequestChannel} +import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel} import kafka.utils._ import kafka.utils.Implicits._ import kafka.utils.TestUtils.TestControllerRequestCompletionHandler @@ -77,6 +77,7 @@ import scala.jdk.CollectionConverters._ import scala.collection.Seq object DynamicBrokerReconfigurationTest { + val Plain = "PLAIN" val SecureInternal = "INTERNAL" val SecureExternal = "EXTERNAL" } @@ -125,7 +126,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS) // Ensure that we can support multiple listeners per security protocol and multiple security protocols props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0") - props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol") props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal) props.put(KafkaConfig.SslClientAuthProp, "requested") props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN") @@ -1167,6 +1168,35 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertTrue(partitions.exists(_.leader == null), "Did not find partitions with no leader") } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testReconfigureRemovedListener(quorum: String): Unit = { +val client = adminClients.head +val broker = servers.head +assertEquals(2, broker.config.dynamicConfig.reconfigurables.asScala.count(r => r.isInstanceOf[DataPlaneAcceptor])) +val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER,
[kafka] branch trunk updated (e4af074b4c6 -> 139f7709bd3)
This is an automated email from the ASF dual-hosted git repository. viktor pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from e4af074b4c6 MINOR: doc: fix typo in config-streams (#13450) add 139f7709bd3 Fix log DateTime format unit test (#13441) No new revisions were added by this update. Summary of changes: .../src/test/java/org/apache/kafka/common/utils/UtilsTest.java | 10 -- 1 file changed, 4 insertions(+), 6 deletions(-)