This is an automated email from the ASF dual-hosted git repository. rndgstn pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e27926f92b1 KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280) e27926f92b1 is described below commit e27926f92b1f6b34ed6731f33c712a5d0d594275 Author: Ron Dagostino <rndg...@gmail.com> AuthorDate: Mon Apr 17 17:52:28 2023 -0400 KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280) topic counts. Introduces the use of persistent data structures in the KRaft metadata image to avoid copying the entire TopicsImage upon every change. Performance that was O(<number of topics in the cluster>) is now O(<number of topics changing>), which has dramatic time and GC improvements for the most common topic-related metadata events. We abstract away the chosen underlying persistent collection library via ImmutableMap<> and ImmutableSet<> interfaces and static factory methods. Reviewers: Luke Chen <show...@gmail.com>, Colin P. McCabe <cmcc...@apache.org>, Ismael Juma <ism...@juma.me.uk>, Purshotam Chauhan <pchau...@confluent.io> --- LICENSE-binary | 3 +- build.gradle | 10 + checkstyle/import-control-jmh-benchmarks.xml | 1 + checkstyle/import-control-metadata.xml | 176 ++++++++++++ checkstyle/import-control-server-common.xml | 82 ++++++ checkstyle/import-control.xml | 133 --------- .../metadata/BrokerMetadataPublisherTest.scala | 6 +- gradle/dependencies.gradle | 2 + .../metadata/KRaftMetadataRequestBenchmark.java | 235 ++++++++++++++++ .../TopicsImageSingleRecordChangeBenchmark.java | 90 ++++++ .../metadata/TopicsImageSnapshotLoadBenchmark.java | 112 ++++++++ .../metadata/TopicsImageZonalOutageBenchmark.java | 99 +++++++ licenses/pcollections-MIT | 24 ++ .../java/org/apache/kafka/image/TopicsDelta.java | 40 ++- .../java/org/apache/kafka/image/TopicsImage.java | 32 ++- .../metrics/ControllerMetricsTestUtils.java | 8 +- .../org/apache/kafka/image/TopicsImageTest.java | 13 +- .../kafka/server/immutable/ImmutableMap.java | 64 +++++ .../kafka/server/immutable/ImmutableSet.java | 60 ++++ .../pcollections/PCollectionsImmutableMap.java | 223 +++++++++++++++ .../pcollections/PCollectionsImmutableSet.java | 188 +++++++++++++ .../kafka/server/immutable/DelegationChecker.java | 146 ++++++++++ .../pcollections/PCollectionsImmutableMapTest.java | 310 +++++++++++++++++++++ .../pcollections/PCollectionsImmutableSetTest.java | 274 ++++++++++++++++++ 24 files changed, 2148 insertions(+), 183 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 842962e61ad..131d64b8633 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -313,7 +313,8 @@ argparse4j-0.7.0, see: licenses/argparse-MIT jopt-simple-5.0.4, see: licenses/jopt-simple-MIT slf4j-api-1.7.36, see: licenses/slf4j-MIT slf4j-reload4j-1.7.36, see: licenses/slf4j-MIT -classgraph-4.8.138, see: license/classgraph-MIT +classgraph-4.8.138, see: licenses/classgraph-MIT +pcollections-4.0.1, see: licenses/pcollections-MIT --------------------------------------- BSD 2-Clause diff --git a/build.gradle b/build.gradle index a4ea31ceed5..773aa7a7769 100644 --- a/build.gradle +++ b/build.gradle @@ -1222,6 +1222,10 @@ project(':metadata') { javadoc { enabled = false } + + checkstyle { + configProperties = checkstyleConfigProperties("import-control-metadata.xml") + } } project(':group-coordinator') { @@ -1554,11 +1558,13 @@ project(':server-common') { implementation libs.slf4jApi implementation libs.metrics implementation libs.joptSimple + implementation libs.pcollections testImplementation project(':clients') testImplementation project(':clients').sourceSets.test.output testImplementation libs.junitJupiter testImplementation libs.mockitoCore + testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc. testImplementation libs.hamcrest testRuntimeOnly libs.slf4jlog4j @@ -1605,6 +1611,10 @@ project(':server-common') { clean.doFirst { delete "$buildDir/kafka/" } + + checkstyle { + configProperties = checkstyleConfigProperties("import-control-server-common.xml") + } } project(':storage:api') { diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index d6e966498d8..4cbd34f89cb 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -51,6 +51,7 @@ <allow pkg="org.apache.kafka.storage"/> <allow pkg="org.apache.kafka.clients"/> <allow pkg="org.apache.kafka.coordinator.group"/> + <allow pkg="org.apache.kafka.image"/> <allow pkg="org.apache.kafka.metadata"/> <allow pkg="org.apache.kafka.timeline" /> diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml new file mode 100644 index 00000000000..ade866d6a07 --- /dev/null +++ b/checkstyle/import-control-metadata.xml @@ -0,0 +1,176 @@ +<!DOCTYPE import-control PUBLIC + "-//Puppy Crawl//DTD Import Control 1.1//EN" + "http://www.puppycrawl.com/dtds/import_control_1_1.dtd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<import-control pkg="org.apache.kafka"> + + <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE --> + + <!-- common library dependencies --> + <allow pkg="java" /> + <allow pkg="javax.management" /> + <allow pkg="org.slf4j" /> + <allow pkg="org.junit" /> + <allow pkg="org.opentest4j" /> + <allow pkg="org.hamcrest" /> + <allow pkg="org.mockito" /> + <allow pkg="org.easymock" /> + <allow pkg="org.powermock" /> + <allow pkg="java.security" /> + <allow pkg="javax.net.ssl" /> + <allow pkg="javax.security" /> + <allow pkg="org.ietf.jgss" /> + <allow pkg="net.jqwik.api" /> + + <!-- no one depends on the server --> + <disallow pkg="kafka" /> + + <!-- anyone can use public classes --> + <allow pkg="org.apache.kafka.common" exact-match="true" /> + <allow pkg="org.apache.kafka.common.security" /> + <allow pkg="org.apache.kafka.common.serialization" /> + <allow pkg="org.apache.kafka.common.utils" /> + <allow pkg="org.apache.kafka.common.errors" exact-match="true" /> + <allow pkg="org.apache.kafka.common.memory" /> + + <!-- persistent collection factories/non-library-specific wrappers --> + <allow pkg="org.apache.kafka.server.immutable" exact-match="true" /> + + <subpackage name="common"> + <subpackage name="metadata"> + <allow pkg="com.fasterxml.jackson" /> + <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.protocol.types" /> + <allow pkg="org.apache.kafka.common.message" /> + <allow pkg="org.apache.kafka.common.metadata" /> + </subpackage> + </subpackage> + + <subpackage name="controller"> + <allow pkg="com.yammer.metrics"/> + <allow pkg="org.apache.kafka.clients" /> + <allow pkg="org.apache.kafka.clients.admin" /> + <allow pkg="org.apache.kafka.common.acl" /> + <allow pkg="org.apache.kafka.common.annotation" /> + <allow pkg="org.apache.kafka.common.config" /> + <allow pkg="org.apache.kafka.common.feature" /> + <allow pkg="org.apache.kafka.common.internals" /> + <allow pkg="org.apache.kafka.common.message" /> + <allow pkg="org.apache.kafka.common.metadata" /> + <allow pkg="org.apache.kafka.common.metrics" /> + <allow pkg="org.apache.kafka.common.network" /> + <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.quota" /> + <allow pkg="org.apache.kafka.common.requests" /> + <allow pkg="org.apache.kafka.common.resource" /> + <allow pkg="org.apache.kafka.controller" /> + <allow pkg="org.apache.kafka.image" /> + <allow pkg="org.apache.kafka.image.writer" /> + <allow pkg="org.apache.kafka.metadata" /> + <allow pkg="org.apache.kafka.metadata.authorizer" /> + <allow pkg="org.apache.kafka.metadata.migration" /> + <allow pkg="org.apache.kafka.metalog" /> + <allow pkg="org.apache.kafka.queue" /> + <allow pkg="org.apache.kafka.raft" /> + <allow pkg="org.apache.kafka.server.authorizer" /> + <allow pkg="org.apache.kafka.server.common" /> + <allow pkg="org.apache.kafka.server.config" /> + <allow pkg="org.apache.kafka.server.fault" /> + <allow pkg="org.apache.kafka.server.metrics" /> + <allow pkg="org.apache.kafka.server.policy"/> + <allow pkg="org.apache.kafka.server.util"/> + <allow pkg="org.apache.kafka.snapshot" /> + <allow pkg="org.apache.kafka.test" /> + <allow pkg="org.apache.kafka.timeline" /> + </subpackage> + + <subpackage name="image"> + <allow pkg="org.apache.kafka.common.config" /> + <allow pkg="org.apache.kafka.common.message" /> + <allow pkg="org.apache.kafka.common.metadata" /> + <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.quota" /> + <allow pkg="org.apache.kafka.common.record" /> + <allow pkg="org.apache.kafka.common.requests" /> + <allow pkg="org.apache.kafka.common.resource" /> + <allow pkg="org.apache.kafka.image" /> + <allow pkg="org.apache.kafka.image.writer" /> + <allow pkg="org.apache.kafka.metadata" /> + <allow pkg="org.apache.kafka.queue" /> + <allow pkg="org.apache.kafka.clients.admin" /> + <allow pkg="org.apache.kafka.raft" /> + <allow pkg="org.apache.kafka.server.common" /> + <allow pkg="org.apache.kafka.server.fault" /> + <allow pkg="org.apache.kafka.server.util" /> + <allow pkg="org.apache.kafka.snapshot" /> + <allow pkg="org.apache.kafka.test" /> + </subpackage> + + <subpackage name="metadata"> + <allow pkg="org.apache.kafka.clients" /> + <allow pkg="org.apache.kafka.common.acl" /> + <allow pkg="org.apache.kafka.common.annotation" /> + <allow pkg="org.apache.kafka.common.config" /> + <allow pkg="org.apache.kafka.common.message" /> + <allow pkg="org.apache.kafka.common.metadata" /> + <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.record" /> + <allow pkg="org.apache.kafka.common.resource" /> + <allow pkg="org.apache.kafka.common.requests" /> + <allow pkg="org.apache.kafka.image" /> + <allow pkg="org.apache.kafka.metadata" /> + <allow pkg="org.apache.kafka.metalog" /> + <allow pkg="org.apache.kafka.queue" /> + <allow pkg="org.apache.kafka.raft" /> + <allow pkg="org.apache.kafka.server.authorizer" /> + <allow pkg="org.apache.kafka.server.common" /> + <allow pkg="org.apache.kafka.server.fault" /> + <allow pkg="org.apache.kafka.server.config" /> + <allow pkg="org.apache.kafka.server.util"/> + <allow pkg="org.apache.kafka.test" /> + <subpackage name="authorizer"> + <allow pkg="org.apache.kafka.common.acl" /> + <allow pkg="org.apache.kafka.common.requests" /> + <allow pkg="org.apache.kafka.common.resource" /> + <allow pkg="org.apache.kafka.controller" /> + <allow pkg="org.apache.kafka.metadata" /> + <allow pkg="org.apache.kafka.common.internals" /> + </subpackage> + <subpackage name="bootstrap"> + <allow pkg="org.apache.kafka.snapshot" /> + </subpackage> + <subpackage name="fault"> + <allow pkg="org.apache.kafka.server.fault" /> + </subpackage> + </subpackage> + + <subpackage name="metalog"> + <allow pkg="org.apache.kafka.common.metadata" /> + <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.record" /> + <allow pkg="org.apache.kafka.metadata" /> + <allow pkg="org.apache.kafka.metalog" /> + <allow pkg="org.apache.kafka.raft" /> + <allow pkg="org.apache.kafka.snapshot" /> + <allow pkg="org.apache.kafka.queue" /> + <allow pkg="org.apache.kafka.server.common" /> + <allow pkg="org.apache.kafka.test" /> + </subpackage> + +</import-control> diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml new file mode 100644 index 00000000000..d310d81a832 --- /dev/null +++ b/checkstyle/import-control-server-common.xml @@ -0,0 +1,82 @@ +<!DOCTYPE import-control PUBLIC + "-//Puppy Crawl//DTD Import Control 1.1//EN" + "http://www.puppycrawl.com/dtds/import_control_1_1.dtd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<import-control pkg="org.apache.kafka"> + + <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE --> + + <!-- common library dependencies --> + <allow pkg="java" /> + <allow pkg="javax.management" /> + <allow pkg="org.slf4j" /> + <allow pkg="org.junit" /> + <allow pkg="org.opentest4j" /> + <allow pkg="org.hamcrest" /> + <allow pkg="org.mockito" /> + <allow pkg="org.easymock" /> + <allow pkg="org.powermock" /> + <allow pkg="java.security" /> + <allow pkg="javax.net.ssl" /> + <allow pkg="javax.security" /> + <allow pkg="org.ietf.jgss" /> + <allow pkg="net.jqwik.api" /> + + <!-- no one depends on the server --> + <disallow pkg="kafka" /> + + <!-- anyone can use public classes --> + <allow pkg="org.apache.kafka.common" exact-match="true" /> + <allow pkg="org.apache.kafka.common.security" /> + <allow pkg="org.apache.kafka.common.serialization" /> + <allow pkg="org.apache.kafka.common.utils" /> + <allow pkg="org.apache.kafka.common.errors" exact-match="true" /> + <allow pkg="org.apache.kafka.common.memory" /> + + <!-- persistent collection factories/non-library-specific wrappers --> + <allow pkg="org.apache.kafka.server.immutable" exact-match="true" /> + + <subpackage name="queue"> + <allow pkg="org.apache.kafka.test" /> + </subpackage> + + <subpackage name="server"> + <allow pkg="org.apache.kafka.common" /> + <allow pkg="joptsimple" /> + + <subpackage name="common"> + <allow pkg="org.apache.kafka.server.common" /> + </subpackage> + + <subpackage name="immutable"> + <allow pkg="org.apache.kafka.server.util"/> + <!-- only the factory package can use persistent collection library-specific wrapper implementations --> + <!-- the library-specific wrapper implementation for PCollections --> + <allow pkg="org.apache.kafka.server.immutable.pcollections" /> + <subpackage name="pcollections"> + <allow pkg="org.pcollections" /> + </subpackage> + </subpackage> + + <subpackage name="metrics"> + <allow pkg="com.yammer.metrics" /> + </subpackage> + </subpackage> + +</import-control> diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 72948e541d5..c3318807f20 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -88,14 +88,6 @@ <allow pkg="org.apache.kafka.common.record" /> </subpackage> - <subpackage name="metadata"> - <allow pkg="com.fasterxml.jackson" /> - <allow pkg="org.apache.kafka.common.protocol" /> - <allow pkg="org.apache.kafka.common.protocol.types" /> - <allow pkg="org.apache.kafka.common.message" /> - <allow pkg="org.apache.kafka.common.metadata" /> - </subpackage> - <subpackage name="metrics"> <allow pkg="org.apache.kafka.common.metrics" /> </subpackage> @@ -206,122 +198,6 @@ </subpackage> </subpackage> - <subpackage name="controller"> - <allow pkg="com.yammer.metrics"/> - <allow pkg="org.apache.kafka.clients" /> - <allow pkg="org.apache.kafka.clients.admin" /> - <allow pkg="org.apache.kafka.common.acl" /> - <allow pkg="org.apache.kafka.common.annotation" /> - <allow pkg="org.apache.kafka.common.config" /> - <allow pkg="org.apache.kafka.common.feature" /> - <allow pkg="org.apache.kafka.common.internals" /> - <allow pkg="org.apache.kafka.common.message" /> - <allow pkg="org.apache.kafka.common.metadata" /> - <allow pkg="org.apache.kafka.common.metrics" /> - <allow pkg="org.apache.kafka.common.network" /> - <allow pkg="org.apache.kafka.common.protocol" /> - <allow pkg="org.apache.kafka.common.quota" /> - <allow pkg="org.apache.kafka.common.record" /> - <allow pkg="org.apache.kafka.common.requests" /> - <allow pkg="org.apache.kafka.common.resource" /> - <allow pkg="org.apache.kafka.controller" /> - <allow pkg="org.apache.kafka.image" /> - <allow pkg="org.apache.kafka.image.writer" /> - <allow pkg="org.apache.kafka.metadata" /> - <allow pkg="org.apache.kafka.metadata.authorizer" /> - <allow pkg="org.apache.kafka.metadata.migration" /> - <allow pkg="org.apache.kafka.metalog" /> - <allow pkg="org.apache.kafka.queue" /> - <allow pkg="org.apache.kafka.raft" /> - <allow pkg="org.apache.kafka.server.authorizer" /> - <allow pkg="org.apache.kafka.server.common" /> - <allow pkg="org.apache.kafka.server.config" /> - <allow pkg="org.apache.kafka.server.fault" /> - <allow pkg="org.apache.kafka.server.metrics" /> - <allow pkg="org.apache.kafka.server.policy"/> - <allow pkg="org.apache.kafka.server.util"/> - <allow pkg="org.apache.kafka.snapshot" /> - <allow pkg="org.apache.kafka.test" /> - <allow pkg="org.apache.kafka.timeline" /> - </subpackage> - - <subpackage name="image"> - <allow pkg="org.apache.kafka.common.config" /> - <allow pkg="org.apache.kafka.common.message" /> - <allow pkg="org.apache.kafka.common.metadata" /> - <allow pkg="org.apache.kafka.common.protocol" /> - <allow pkg="org.apache.kafka.common.quota" /> - <allow pkg="org.apache.kafka.common.record" /> - <allow pkg="org.apache.kafka.common.requests" /> - <allow pkg="org.apache.kafka.common.resource" /> - <allow pkg="org.apache.kafka.image" /> - <allow pkg="org.apache.kafka.image.writer" /> - <allow pkg="org.apache.kafka.metadata" /> - <allow pkg="org.apache.kafka.queue" /> - <allow pkg="org.apache.kafka.clients.admin" /> - <allow pkg="org.apache.kafka.raft" /> - <allow pkg="org.apache.kafka.server.common" /> - <allow pkg="org.apache.kafka.server.fault" /> - <allow pkg="org.apache.kafka.server.util" /> - <allow pkg="org.apache.kafka.snapshot" /> - <allow pkg="org.apache.kafka.test" /> - </subpackage> - - <subpackage name="metadata"> - <allow pkg="org.apache.kafka.clients" /> - <allow pkg="org.apache.kafka.common.acl" /> - <allow pkg="org.apache.kafka.common.annotation" /> - <allow pkg="org.apache.kafka.common.config" /> - <allow pkg="org.apache.kafka.common.message" /> - <allow pkg="org.apache.kafka.common.metadata" /> - <allow pkg="org.apache.kafka.common.protocol" /> - <allow pkg="org.apache.kafka.common.record" /> - <allow pkg="org.apache.kafka.common.resource" /> - <allow pkg="org.apache.kafka.common.requests" /> - <allow pkg="org.apache.kafka.image" /> - <allow pkg="org.apache.kafka.metadata" /> - <allow pkg="org.apache.kafka.metalog" /> - <allow pkg="org.apache.kafka.queue" /> - <allow pkg="org.apache.kafka.raft" /> - <allow pkg="org.apache.kafka.server.authorizer" /> - <allow pkg="org.apache.kafka.server.common" /> - <allow pkg="org.apache.kafka.server.fault" /> - <allow pkg="org.apache.kafka.server.config" /> - <allow pkg="org.apache.kafka.server.util"/> - <allow pkg="org.apache.kafka.test" /> - <subpackage name="authorizer"> - <allow pkg="org.apache.kafka.common.acl" /> - <allow pkg="org.apache.kafka.common.requests" /> - <allow pkg="org.apache.kafka.common.resource" /> - <allow pkg="org.apache.kafka.controller" /> - <allow pkg="org.apache.kafka.metadata" /> - <allow pkg="org.apache.kafka.common.internals" /> - </subpackage> - <subpackage name="bootstrap"> - <allow pkg="org.apache.kafka.snapshot" /> - </subpackage> - <subpackage name="fault"> - <allow pkg="org.apache.kafka.server.fault" /> - </subpackage> - </subpackage> - - <subpackage name="metalog"> - <allow pkg="org.apache.kafka.common.metadata" /> - <allow pkg="org.apache.kafka.common.protocol" /> - <allow pkg="org.apache.kafka.common.record" /> - <allow pkg="org.apache.kafka.metadata" /> - <allow pkg="org.apache.kafka.metalog" /> - <allow pkg="org.apache.kafka.raft" /> - <allow pkg="org.apache.kafka.snapshot" /> - <allow pkg="org.apache.kafka.queue" /> - <allow pkg="org.apache.kafka.server.common" /> - <allow pkg="org.apache.kafka.test" /> - </subpackage> - - <subpackage name="queue"> - <allow pkg="org.apache.kafka.test" /> - </subpackage> - <subpackage name="clients"> <allow pkg="org.apache.kafka.common" /> <allow pkg="org.apache.kafka.clients" exact-match="true"/> @@ -358,19 +234,10 @@ <subpackage name="server"> <allow pkg="org.apache.kafka.common" /> - <allow pkg="joptsimple" /> <!-- This is required to make AlterConfigPolicyTest work. --> <allow pkg="org.apache.kafka.server.policy" /> - <subpackage name="common"> - <allow pkg="org.apache.kafka.server.common" /> - </subpackage> - - <subpackage name="metrics"> - <allow pkg="com.yammer.metrics" /> - </subpackage> - <subpackage name="log"> <allow pkg="com.fasterxml.jackson" /> <allow pkg="kafka.api" /> diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index ffe9b3f4076..d7d332ea7fb 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -177,9 +177,9 @@ class BrokerMetadataPublisherTest { private def topicsImage( topics: Seq[TopicImage] ): TopicsImage = { - val idsMap = topics.map(t => t.id -> t).toMap - val namesMap = topics.map(t => t.name -> t).toMap - new TopicsImage(idsMap.asJava, namesMap.asJava) + var retval = TopicsImage.EMPTY + topics.foreach { t => retval = retval.including(t) } + retval } private def newMockDynamicConfigPublisher( diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 40d408949db..23636469f71 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -108,6 +108,7 @@ versions += [ metrics: "2.2.0", mockito: "4.9.0", netty: "4.1.86.Final", + pcollections: "4.0.1", powermock: "2.0.9", reflections: "0.9.12", reload4j: "1.2.19", @@ -198,6 +199,7 @@ libs += [ mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito", nettyHandler: "io.netty:netty-handler:$versions.netty", nettyTransportNativeEpoll: "io.netty:netty-transport-native-epoll:$versions.netty", + pcollections: "org.pcollections:pcollections:$versions.pcollections", powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock", powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock", reflections: "org.reflections:reflections:$versions.reflections", diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java new file mode 100644 index 00000000000..8e997385fc3 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.metadata; + +import kafka.coordinator.transaction.TransactionCoordinator; +import kafka.network.RequestChannel; +import kafka.network.RequestConvertToJson; +import kafka.server.AutoTopicCreationManager; +import kafka.server.BrokerTopicStats; +import kafka.server.ClientQuotaManager; +import kafka.server.ClientRequestQuotaManager; +import kafka.server.ControllerMutationQuotaManager; +import kafka.server.FetchManager; +import kafka.server.ForwardingManager; +import kafka.server.KafkaApis; +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.MetadataCache; +import kafka.server.QuotaFactory; +import kafka.server.RaftSupport; +import kafka.server.ReplicaManager; +import kafka.server.ReplicationQuotaManager; +import kafka.server.SimpleApiVersionManager; +import kafka.server.builders.KafkaApisBuilder; +import kafka.server.metadata.KRaftMetadataCache; +import kafka.server.metadata.MockConfigRepository; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.GroupCoordinator; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.mockito.Mockito; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import scala.Option; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) + +public class KRaftMetadataRequestBenchmark { + @Param({"500", "1000", "5000"}) + private int topicCount; + @Param({"10", "20", "50"}) + private int partitionCount; + + private RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly()); + private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class); + private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class); + private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class); + private AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class); + private Metrics metrics = new Metrics(); + private int brokerId = 1; + private ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class); + private KRaftMetadataCache metadataCache = MetadataCache.kRaftMetadataCache(brokerId); + private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class); + private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class); + private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class); + private ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class); + private QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager, + clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager, + replicaQuotaManager, replicaQuotaManager, Option.empty()); + private FetchManager fetchManager = Mockito.mock(FetchManager.class); + private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); + private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); + private KafkaApis kafkaApis; + private RequestChannel.Request allTopicMetadataRequest; + + @Setup(Level.Trial) + public void setup() { + initializeMetadataCache(); + kafkaApis = createKafkaApis(); + allTopicMetadataRequest = buildAllTopicMetadataRequest(); + } + + private void initializeMetadataCache() { + MetadataDelta buildupMetadataDelta = new MetadataDelta(MetadataImage.EMPTY); + IntStream.range(0, 5).forEach(brokerId -> { + RegisterBrokerRecord.BrokerEndpointCollection endpoints = new RegisterBrokerRecord.BrokerEndpointCollection(); + endpoints(brokerId).forEach(endpoint -> + endpoints.add(new RegisterBrokerRecord.BrokerEndpoint(). + setHost(endpoint.host()). + setPort(endpoint.port()). + setName(endpoint.listener()). + setSecurityProtocol(endpoint.securityProtocol()))); + buildupMetadataDelta.replay(new RegisterBrokerRecord(). + setBrokerId(brokerId). + setBrokerEpoch(100L). + setFenced(false). + setRack(null). + setEndPoints(endpoints). + setIncarnationId(Uuid.fromString(Uuid.randomUuid().toString()))); + }); + IntStream.range(0, topicCount).forEach(topicNum -> { + Uuid topicId = Uuid.randomUuid(); + buildupMetadataDelta.replay(new TopicRecord().setName("topic-" + topicNum).setTopicId(topicId)); + IntStream.range(0, partitionCount).forEach(partitionId -> + buildupMetadataDelta.replay(new PartitionRecord(). + setPartitionId(partitionId). + setTopicId(topicId). + setReplicas(Arrays.asList(0, 1, 3)). + setIsr(Arrays.asList(0, 1, 3)). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()). + setLeader(partitionCount % 5). + setLeaderEpoch(0))); + }); + metadataCache.setImage(buildupMetadataDelta.apply(MetadataProvenance.EMPTY)); + } + + private List<UpdateMetadataEndpoint> endpoints(final int brokerId) { + return Collections.singletonList( + new UpdateMetadataEndpoint() + .setHost("host_" + brokerId) + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value())); + } + + private KafkaApis createKafkaApis() { + Properties kafkaProps = new Properties(); + kafkaProps.put(KafkaConfig$.MODULE$.NodeIdProp(), brokerId + ""); + kafkaProps.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker"); + kafkaProps.put(KafkaConfig$.MODULE$.QuorumVotersProp(), "9000@foo:8092"); + kafkaProps.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER"); + KafkaConfig config = new KafkaConfig(kafkaProps); + return new KafkaApisBuilder(). + setRequestChannel(requestChannel). + setMetadataSupport(new RaftSupport(forwardingManager, metadataCache)). + setReplicaManager(replicaManager). + setGroupCoordinator(groupCoordinator). + setTxnCoordinator(transactionCoordinator). + setAutoTopicCreationManager(autoTopicCreationManager). + setBrokerId(brokerId). + setConfig(config). + setConfigRepository(new MockConfigRepository()). + setMetadataCache(metadataCache). + setMetrics(metrics). + setAuthorizer(Optional.empty()). + setQuotas(quotaManagers). + setFetchManager(fetchManager). + setBrokerTopicStats(brokerTopicStats). + setClusterId("clusterId"). + setTime(Time.SYSTEM). + setTokenManager(null). + setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)). + build(); + } + + @TearDown(Level.Trial) + public void tearDown() { + kafkaApis.close(); + metrics.close(); + } + + private RequestChannel.Request buildAllTopicMetadataRequest() { + MetadataRequest metadataRequest = MetadataRequest.Builder.allTopics().build(); + RequestHeader header = new RequestHeader(metadataRequest.apiKey(), metadataRequest.version(), "", 0); + ByteBuffer bodyBuffer = metadataRequest.serialize(); + + RequestContext context = new RequestContext(header, "1", null, principal, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false); + return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, bodyBuffer, requestChannelMetrics, Option.empty()); + } + + @Benchmark + public void testMetadataRequestForAllTopics() { + kafkaApis.handleTopicMetadataRequest(allTopicMetadataRequest); + } + + @Benchmark + public String testRequestToJson() { + return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString(); + } + + @Benchmark + public void testTopicIdInfo() { + metadataCache.topicIdInfo(); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java new file mode 100644 index 00000000000..8f88a7a1e6f --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.metadata; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class TopicsImageSingleRecordChangeBenchmark { + @Param({"12500", "25000", "50000", "100000"}) + private int totalTopicCount; + @Param({"10"}) + private int partitionsPerTopic; + @Param({"3"}) + private int replicationFactor; + @Param({"10000"}) + private int numReplicasPerBroker; + + private TopicsDelta topicsDelta; + + + @Setup(Level.Trial) + public void setup() { + // build an image containing all the specified topics and partitions + TopicsDelta buildupTopicsDelta = TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker); + TopicsImage builtupTopicsImage = buildupTopicsDelta.apply(); + // build a delta to apply within the benchmark code + // that adds a single topic-partition + topicsDelta = new TopicsDelta(builtupTopicsImage); + Uuid newTopicUuid = Uuid.randomUuid(); + TopicRecord newTopicRecord = new TopicRecord().setName("newtopic").setTopicId(newTopicUuid); + topicsDelta.replay(newTopicRecord); + ArrayList<Integer> replicas = TopicsImageSnapshotLoadBenchmark.getReplicas(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker, 0); + ArrayList<Integer> isr = new ArrayList<>(replicas); + PartitionRecord newPartitionRecord = new PartitionRecord(). + setPartitionId(0). + setTopicId(newTopicUuid). + setReplicas(replicas). + setIsr(isr). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()). + setLeader(0); + topicsDelta.replay(newPartitionRecord); + System.out.print("(Adding a single topic to metadata having " + totalTopicCount + " total topics) "); + } + + @Benchmark + public void testTopicsDeltaSingleTopicAdd() { + topicsDelta.apply(); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java new file mode 100644 index 00000000000..10961d9d025 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.metadata; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TopicsImageSnapshotLoadBenchmark { + @Param({"12500", "25000", "50000", "100000"}) + private int totalTopicCount; + @Param({"10"}) + private int partitionsPerTopic; + @Param({"3"}) + private int replicationFactor; + @Param({"10000"}) + private int numReplicasPerBroker; + + private TopicsDelta topicsDelta; + + + @Setup(Level.Trial) + public void setup() { + // build a delta to apply within the benchmark code + // that consists of all the topics and partitions that would get loaded in a snapshot + topicsDelta = getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker); + System.out.print("(Loading a snapshot containing " + totalTopicCount + " total topics) "); + } + + static TopicsDelta getInitialTopicsDelta(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker) { + int numBrokers = getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker); + TopicsDelta buildupTopicsDelta = new TopicsDelta(TopicsImage.EMPTY); + final AtomicInteger currentLeader = new AtomicInteger(0); + IntStream.range(0, totalTopicCount).forEach(topicNumber -> { + Uuid topicId = Uuid.randomUuid(); + buildupTopicsDelta.replay(new TopicRecord().setName("topic" + topicNumber).setTopicId(topicId)); + IntStream.range(0, partitionsPerTopic).forEach(partitionNumber -> { + ArrayList<Integer> replicas = getReplicas(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker, currentLeader.get()); + ArrayList<Integer> isr = new ArrayList<>(replicas); + buildupTopicsDelta.replay(new PartitionRecord(). + setPartitionId(partitionNumber). + setTopicId(topicId). + setReplicas(replicas). + setIsr(isr). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()). + setLeader(currentLeader.get())); + currentLeader.set((1 + currentLeader.get()) % numBrokers); + }); + }); + return buildupTopicsDelta; + } + + static ArrayList<Integer> getReplicas(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker, int currentLeader) { + ArrayList<Integer> replicas = new ArrayList<>(); + int numBrokers = getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker); + IntStream.range(0, replicationFactor).forEach(replicaNumber -> + replicas.add((replicaNumber + currentLeader) % numBrokers)); + return replicas; + } + + static int getNumBrokers(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker) { + int numBrokers = totalTopicCount * partitionsPerTopic * replicationFactor / numReplicasPerBroker; + return numBrokers - numBrokers % 3; + } + + @Benchmark + public void testTopicsDeltaSnapshotLoad() { + topicsDelta.apply(); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java new file mode 100644 index 00000000000..5890763397a --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.metadata; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TopicsImageZonalOutageBenchmark { + @Param({"12500", "25000", "50000", "100000"}) + private int totalTopicCount; + @Param({"10"}) + private int partitionsPerTopic; + @Param({"3"}) + private int replicationFactor; + @Param({"10000"}) + private int numReplicasPerBroker; + + private TopicsDelta topicsDelta; + + + @Setup(Level.Trial) + public void setup() { + // build an image containing all of the specified topics and partitions + TopicsDelta buildupTopicsDelta = TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker); + TopicsImage builtupTopicsImage = buildupTopicsDelta.apply(); + // build a delta to apply within the benchmark code + // that perturbs all the topic-partitions for broker 0 + // (as might happen in a zonal outage, one broker at a time, ultimately across 1/3 of the brokers in the cluster). + // It turns out that + topicsDelta = new TopicsDelta(builtupTopicsImage); + Set<Uuid> perturbedTopics = new HashSet<>(); + builtupTopicsImage.topicsById().forEach((topicId, topicImage) -> + topicImage.partitions().forEach((partitionNumber, partitionRegistration) -> { + List<Integer> newIsr = Arrays.stream(partitionRegistration.isr).boxed().filter(n -> n != 0).collect(Collectors.toList()); + if (newIsr.size() < replicationFactor) { + perturbedTopics.add(topicId); + topicsDelta.replay(new PartitionRecord(). + setPartitionId(partitionNumber). + setTopicId(topicId). + setReplicas(Arrays.stream(partitionRegistration.replicas).boxed().collect(Collectors.toList())). + setIsr(newIsr). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()). + setLeader(newIsr.get(0))); + } + }) + ); + int numBrokers = TopicsImageSnapshotLoadBenchmark.getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker); + System.out.print("(Perturbing 1 of " + numBrokers + " brokers, or " + perturbedTopics.size() + " topics within metadata having " + totalTopicCount + " total topics) "); + } + + @Benchmark + public void testTopicsDeltaZonalOutage() { + topicsDelta.apply(); + } +} diff --git a/licenses/pcollections-MIT b/licenses/pcollections-MIT new file mode 100644 index 00000000000..50519c5e432 --- /dev/null +++ b/licenses/pcollections-MIT @@ -0,0 +1,24 @@ +MIT License + +Copyright 2008-2011, 2014-2020, 2022 Harold Cooper, gil cattaneo, Gleb Frank, +Günther Grill, Ilya Gorbunov, Jirka Kremser, Jochen Theodorou, Johnny Lim, +Liam Miller, Mark Perry, Matei Dragu, Mike Klein, Oleg Osipenko, Ran Ari-Gur, +Shantanu Kumar, and Valeriy Vyrva. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java index d3c5888fa7a..3927e19191d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java @@ -24,13 +24,13 @@ import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.immutable.ImmutableMap; import org.apache.kafka.server.common.MetadataVersion; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; @@ -126,29 +126,27 @@ public final class TopicsDelta { } public TopicsImage apply() { - Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size()); - Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size()); - for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) { - Uuid id = entry.getKey(); - TopicImage prevTopicImage = entry.getValue(); - TopicDelta delta = changedTopics.get(id); - if (delta == null) { - if (!deletedTopicIds.contains(id)) { - newTopicsById.put(id, prevTopicImage); - newTopicsByName.put(prevTopicImage.name(), prevTopicImage); - } + ImmutableMap<Uuid, TopicImage> newTopicsById = image.topicsById(); + ImmutableMap<String, TopicImage> newTopicsByName = image.topicsByName(); + // apply all the deletes + for (Uuid topicId: deletedTopicIds) { + // it was deleted, so we have to remove it from the maps + TopicImage originalTopicToBeDeleted = image.topicsById().get(topicId); + if (originalTopicToBeDeleted == null) { + throw new IllegalStateException("Missing topic id " + topicId); } else { - TopicImage newTopicImage = delta.apply(); - newTopicsById.put(id, newTopicImage); - newTopicsByName.put(delta.name(), newTopicImage); + newTopicsById = newTopicsById.removed(topicId); + newTopicsByName = newTopicsByName.removed(originalTopicToBeDeleted.name()); } } - for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) { - if (!newTopicsById.containsKey(entry.getKey())) { - TopicImage newTopicImage = entry.getValue().apply(); - newTopicsById.put(newTopicImage.id(), newTopicImage); - newTopicsByName.put(newTopicImage.name(), newTopicImage); - } + // apply all the updates/additions + for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) { + Uuid topicId = entry.getKey(); + TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply(); + // put new information into the maps + String topicName = newTopicToBeAddedOrUpdated.name(); + newTopicsById = newTopicsById.updated(topicId, newTopicToBeAddedOrUpdated); + newTopicsByName = newTopicsByName.updated(topicName, newTopicToBeAddedOrUpdated); } return new TopicsImage(newTopicsById, newTopicsByName); } diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java index 5f7db112f0a..569264b1c4c 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java @@ -21,41 +21,45 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.server.immutable.ImmutableMap; import org.apache.kafka.server.util.TranslatedValueMapView; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; - /** * Represents the topics in the metadata image. * * This class is thread-safe. */ public final class TopicsImage { - public static final TopicsImage EMPTY = - new TopicsImage(Collections.emptyMap(), Collections.emptyMap()); + public static final TopicsImage EMPTY = new TopicsImage(ImmutableMap.empty(), ImmutableMap.empty()); + + private final ImmutableMap<Uuid, TopicImage> topicsById; + private final ImmutableMap<String, TopicImage> topicsByName; - private final Map<Uuid, TopicImage> topicsById; - private final Map<String, TopicImage> topicsByName; + public TopicsImage(ImmutableMap<Uuid, TopicImage> topicsById, + ImmutableMap<String, TopicImage> topicsByName) { + this.topicsById = topicsById; + this.topicsByName = topicsByName; + } - public TopicsImage(Map<Uuid, TopicImage> topicsById, - Map<String, TopicImage> topicsByName) { - this.topicsById = Collections.unmodifiableMap(topicsById); - this.topicsByName = Collections.unmodifiableMap(topicsByName); + public TopicsImage including(TopicImage topic) { + return new TopicsImage( + this.topicsById.updated(topic.id(), topic), + this.topicsByName.updated(topic.name(), topic)); } public boolean isEmpty() { return topicsById.isEmpty() && topicsByName.isEmpty(); } - public Map<Uuid, TopicImage> topicsById() { + public ImmutableMap<Uuid, TopicImage> topicsById() { return topicsById; } - public Map<String, TopicImage> topicsByName() { + public ImmutableMap<String, TopicImage> topicsByName() { return topicsByName; } @@ -74,8 +78,8 @@ public final class TopicsImage { } public void write(ImageWriter writer, ImageWriterOptions options) { - for (TopicImage topicImage : topicsById.values()) { - topicImage.write(writer, options); + for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) { + entry.getValue().write(writer, options); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java index 7781bbdce9f..8be8548433a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java @@ -99,12 +99,10 @@ public class ControllerMetricsTestUtils { public static TopicsImage fakeTopicsImage( TopicImage... topics ) { - Map<Uuid, TopicImage> topicsById = new HashMap<>(); - Map<String, TopicImage> topicsByName = new HashMap<>(); + TopicsImage image = TopicsImage.EMPTY; for (TopicImage topic : topics) { - topicsById.put(topic.id(), topic); - topicsByName.put(topic.name(), topic); + image = image.including(topic); } - return new TopicsImage(topicsById, topicsByName); + return image; } } diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java index 8268ec86091..11af3488bf5 100644 --- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.immutable.ImmutableMap; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -74,18 +75,18 @@ public class TopicsImageTest { return new TopicImage(name, id, partitionMap); } - private static Map<Uuid, TopicImage> newTopicsByIdMap(Collection<TopicImage> topics) { - Map<Uuid, TopicImage> map = new HashMap<>(); + private static ImmutableMap<Uuid, TopicImage> newTopicsByIdMap(Collection<TopicImage> topics) { + ImmutableMap<Uuid, TopicImage> map = TopicsImage.EMPTY.topicsById(); for (TopicImage topic : topics) { - map.put(topic.id(), topic); + map = map.updated(topic.id(), topic); } return map; } - private static Map<String, TopicImage> newTopicsByNameMap(Collection<TopicImage> topics) { - Map<String, TopicImage> map = new HashMap<>(); + private static ImmutableMap<String, TopicImage> newTopicsByNameMap(Collection<TopicImage> topics) { + ImmutableMap<String, TopicImage> map = TopicsImage.EMPTY.topicsByName(); for (TopicImage topic : topics) { - map.put(topic.name(), topic); + map = map.updated(topic.name(), topic); } return map; } diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java new file mode 100644 index 00000000000..ec9790b3ff3 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableMap; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param <K> the key type + * @param <V> the value type + */ +public interface ImmutableMap<K, V> extends Map<K, V> { + /** + * @return a wrapped hash-based persistent map that is empty + * @param <K> the key type + * @param <V> the value type + */ + static <K, V> ImmutableMap<K, V> empty() { + return PCollectionsImmutableMap.empty(); + } + + /** + * @param key the key + * @param value the value + * @return a wrapped hash-based persistent map that has a single mapping + * @param <K> the key type + * @param <V> the value type + */ + static <K, V> ImmutableMap<K, V> singleton(K key, V value) { + return PCollectionsImmutableMap.singleton(key, value); + } + + /** + * @param key the key + * @param value the value + * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary) + */ + ImmutableMap<K, V> updated(K key, V value); + + /** + * @param key the key + * @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary) + */ + ImmutableMap<K, V> removed(K key); +} diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java new file mode 100644 index 00000000000..6f2cfd015a9 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableSet; + +import java.util.Set; + +/** + * A persistent Hash-based Set wrapper + * java.util.Set methods that mutate in-place will throw UnsupportedOperationException + * + * @param <E> the element type + */ +public interface ImmutableSet<E> extends Set<E> { + + /** + * @return a wrapped hash-based persistent set that is empty + * @param <E> the element type + */ + static <E> ImmutableSet<E> empty() { + return PCollectionsImmutableSet.empty(); + } + + /** + * @param e the element + * @return a wrapped hash-based persistent set that has a single element + * @param <E> the element type + */ + static <E> ImmutableSet<E> singleton(E e) { + return PCollectionsImmutableSet.singleton(e); + } + + /** + * @param e the element + * @return a wrapped persistent set that differs from this one in that the given element is added (if necessary) + */ + ImmutableSet<E> added(E e); + + /** + * @param e the element + * @return a wrapped persistent set that differs from this one in that the given element is added (if necessary) + */ + ImmutableSet<E> removed(E e); +} diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java new file mode 100644 index 00000000000..d808f0db9c3 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable.pcollections; + +import org.apache.kafka.server.immutable.ImmutableMap; +import org.pcollections.HashPMap; +import org.pcollections.HashTreePMap; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; + +@SuppressWarnings("deprecation") +public class PCollectionsImmutableMap<K, V> implements ImmutableMap<K, V> { + + private final HashPMap<K, V> underlying; + + /** + * @return a wrapped hash-based persistent map that is empty + * @param <K> the key type + * @param <V> the value type + */ + public static <K, V> PCollectionsImmutableMap<K, V> empty() { + return new PCollectionsImmutableMap<>(HashTreePMap.empty()); + } + + /** + * @param key the key + * @param value the value + * @return a wrapped hash-based persistent map that has a single mapping + * @param <K> the key type + * @param <V> the value type + */ + public static <K, V> PCollectionsImmutableMap<K, V> singleton(K key, V value) { + return new PCollectionsImmutableMap<>(HashTreePMap.singleton(key, value)); + } + + public PCollectionsImmutableMap(HashPMap<K, V> map) { + this.underlying = Objects.requireNonNull(map); + } + + @Override + public ImmutableMap<K, V> updated(K key, V value) { + return new PCollectionsImmutableMap<>(underlying().plus(key, value)); + } + + @Override + public ImmutableMap<K, V> removed(K key) { + return new PCollectionsImmutableMap<>(underlying().minus(key)); + } + + @Override + public int size() { + return underlying().size(); + } + + @Override + public boolean isEmpty() { + return underlying().isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return underlying().containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return underlying().containsValue(value); + } + + @Override + public V get(Object key) { + return underlying().get(key); + } + + @Override + public V put(K key, V value) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().put(key, value); + } + + @Override + public V remove(Object key) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().remove(key); + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + // will throw UnsupportedOperationException; delegate anyway for testability + underlying().putAll(m); + } + + @Override + public void clear() { + // will throw UnsupportedOperationException; delegate anyway for testability + underlying().clear(); + } + + @Override + public Set<K> keySet() { + return underlying().keySet(); + } + + @Override + public Collection<V> values() { + return underlying().values(); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return underlying().entrySet(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PCollectionsImmutableMap<?, ?> that = (PCollectionsImmutableMap<?, ?>) o; + return underlying().equals(that.underlying()); + } + + @Override + public int hashCode() { + return underlying().hashCode(); + } + + @Override + public V getOrDefault(Object key, V defaultValue) { + return underlying().getOrDefault(key, defaultValue); + } + + @Override + public void forEach(BiConsumer<? super K, ? super V> action) { + underlying().forEach(action); + } + + @Override + public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { + // will throw UnsupportedOperationException; delegate anyway for testability + underlying().replaceAll(function); + } + + @Override + public V putIfAbsent(K key, V value) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().putIfAbsent(key, value); + } + + @Override + public boolean remove(Object key, Object value) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().remove(key, value); + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().replace(key, oldValue, newValue); + } + + @Override + public V replace(K key, V value) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().replace(key, value); + } + + @Override + public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().computeIfAbsent(key, mappingFunction); + } + + @Override + public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().computeIfPresent(key, remappingFunction); + } + + @Override + public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().compute(key, remappingFunction); + } + + @Override + public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().merge(key, value, remappingFunction); + } + + @Override + public String toString() { + return "PCollectionsImmutableMap{" + + "underlying=" + underlying() + + '}'; + } + + // package-private for testing + HashPMap<K, V> underlying() { + return underlying; + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java new file mode 100644 index 00000000000..8a50326ef1f --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable.pcollections; + +import org.apache.kafka.server.immutable.ImmutableSet; +import org.pcollections.HashTreePSet; +import org.pcollections.MapPSet; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Stream; + +@SuppressWarnings("deprecation") +public class PCollectionsImmutableSet<E> implements ImmutableSet<E> { + private final MapPSet<E> underlying; + + /** + * @return a wrapped hash-based persistent set that is empty + * @param <E> the element type + */ + public static <E> PCollectionsImmutableSet<E> empty() { + return new PCollectionsImmutableSet<>(HashTreePSet.empty()); + } + + /** + * @param e the element + * @return a wrapped hash-based persistent set that has a single element + * @param <E> the element type + */ + public static <E> PCollectionsImmutableSet<E> singleton(E e) { + return new PCollectionsImmutableSet<>(HashTreePSet.singleton(e)); + } + + public PCollectionsImmutableSet(MapPSet<E> set) { + this.underlying = Objects.requireNonNull(set); + } + + @Override + public ImmutableSet<E> added(E e) { + return new PCollectionsImmutableSet<>(underlying().plus(e)); + } + + @Override + public ImmutableSet<E> removed(E e) { + return new PCollectionsImmutableSet<>(underlying().minus(e)); + } + + @Override + public int size() { + return underlying().size(); + } + + @Override + public boolean isEmpty() { + return underlying().isEmpty(); + } + + @Override + public boolean contains(Object o) { + return underlying().contains(o); + } + + @Override + public Iterator<E> iterator() { + return underlying.iterator(); + } + + @Override + public void forEach(Consumer<? super E> action) { + underlying().forEach(action); + } + + @Override + public Object[] toArray() { + return underlying().toArray(); + } + + @Override + public <T> T[] toArray(T[] a) { + return underlying().toArray(a); + } + + @Override + public boolean add(E e) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().add(e); + } + + @Override + public boolean remove(Object o) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().remove(o); + } + + @Override + public boolean containsAll(Collection<?> c) { + return underlying.containsAll(c); + } + + @Override + public boolean addAll(Collection<? extends E> c) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().addAll(c); + } + + @Override + public boolean retainAll(Collection<?> c) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().retainAll(c); + } + + @Override + public boolean removeAll(Collection<?> c) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().removeAll(c); + } + + @Override + public boolean removeIf(Predicate<? super E> filter) { + // will throw UnsupportedOperationException; delegate anyway for testability + return underlying().removeIf(filter); + } + + @Override + public void clear() { + // will throw UnsupportedOperationException; delegate anyway for testability + underlying().clear(); + } + + @Override + public Spliterator<E> spliterator() { + return underlying().spliterator(); + } + + @Override + public Stream<E> stream() { + return underlying().stream(); + } + + @Override + public Stream<E> parallelStream() { + return underlying().parallelStream(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PCollectionsImmutableSet<?> that = (PCollectionsImmutableSet<?>) o; + return Objects.equals(underlying(), that.underlying()); + } + + @Override + public int hashCode() { + return underlying().hashCode(); + } + + @Override + public String toString() { + return "PCollectionsImmutableSet{" + + "underlying=" + underlying() + + '}'; + } + + // package-private for testing + MapPSet<E> underlying() { + return this.underlying; + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java b/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java new file mode 100644 index 00000000000..e657e9b11c4 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import org.mockito.Mockito; + +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +/** + * Facilitate testing of wrapper class delegation. + * + * We require the following things to test delegation: + * + * 1. A mock object to which the wrapper is expected to delegate method invocations + * 2. A way to define how the mock is expected to behave when its method is invoked + * 3. A way to define how to invoke the method on the wrapper + * 4. A way to test that the method on the mock is invoked correctly when the wrapper method is invoked + * 5. A way to test that any return value from the wrapper method is correct + + * @param <D> delegate type + * @param <W> wrapper type + * @param <T> delegating method return type, if any + */ +public abstract class DelegationChecker<D, W, T> { + private final D mock; + private final W wrapper; + private Consumer<D> mockConsumer; + private Function<D, T> mockConfigurationFunction; + private T mockFunctionReturnValue; + private Consumer<W> wrapperConsumer; + private Function<W, T> wrapperFunctionApplier; + private Function<T, ?> mockFunctionReturnValueTransformation; + private boolean expectWrapperToWrapMockFunctionReturnValue; + private boolean persistentCollectionMethodInvokedCorrectly = false; + + /** + * @param mock mock for the underlying delegate + * @param wrapperCreator how to create a wrapper for the mock + */ + protected DelegationChecker(D mock, Function<D, W> wrapperCreator) { + this.mock = Objects.requireNonNull(mock); + this.wrapper = Objects.requireNonNull(wrapperCreator).apply(mock); + } + + /** + * @param wrapper the wrapper + * @return the underlying delegate for the given wrapper + */ + public abstract D unwrap(W wrapper); + + public DelegationChecker<D, W, T> defineMockConfigurationForVoidMethodInvocation(Consumer<D> mockConsumer) { + this.mockConsumer = Objects.requireNonNull(mockConsumer); + return this; + } + + public DelegationChecker<D, W, T> defineMockConfigurationForFunctionInvocation(Function<D, T> mockConfigurationFunction, T mockFunctionReturnValue) { + this.mockConfigurationFunction = Objects.requireNonNull(mockConfigurationFunction); + this.mockFunctionReturnValue = mockFunctionReturnValue; + return this; + } + + public DelegationChecker<D, W, T> defineWrapperVoidMethodInvocation(Consumer<W> wrapperConsumer) { + this.wrapperConsumer = Objects.requireNonNull(wrapperConsumer); + return this; + } + + public <R> DelegationChecker<D, W, T> defineWrapperFunctionInvocationAndMockReturnValueTransformation( + Function<W, T> wrapperFunctionApplier, + Function<T, R> expectedFunctionReturnValueTransformation) { + this.wrapperFunctionApplier = Objects.requireNonNull(wrapperFunctionApplier); + this.mockFunctionReturnValueTransformation = Objects.requireNonNull(expectedFunctionReturnValueTransformation); + return this; + } + + public DelegationChecker<D, W, T> expectWrapperToWrapMockFunctionReturnValue() { + this.expectWrapperToWrapMockFunctionReturnValue = true; + return this; + } + + public void doVoidMethodDelegationCheck() { + if (mockConsumer == null || wrapperConsumer == null || + mockConfigurationFunction != null || wrapperFunctionApplier != null || + mockFunctionReturnValue != null || mockFunctionReturnValueTransformation != null) { + throwExceptionForIllegalTestSetup(); + } + // configure the mock to behave as desired + mockConsumer.accept(Mockito.doAnswer(invocation -> { + persistentCollectionMethodInvokedCorrectly = true; + return null; + }).when(mock)); + // invoke the wrapper, which should invoke the mock as desired + wrapperConsumer.accept(wrapper); + // assert that the expected delegation to the mock actually occurred + assertTrue(persistentCollectionMethodInvokedCorrectly); + } + + @SuppressWarnings("unchecked") + public void doFunctionDelegationCheck() { + if (mockConfigurationFunction == null || wrapperFunctionApplier == null || + mockFunctionReturnValueTransformation == null || + mockConsumer != null || wrapperConsumer != null) { + throwExceptionForIllegalTestSetup(); + } + // configure the mock to behave as desired + when(mockConfigurationFunction.apply(mock)).thenAnswer(invocation -> { + persistentCollectionMethodInvokedCorrectly = true; + return mockFunctionReturnValue; + }); + // invoke the wrapper, which should invoke the mock as desired + T wrapperReturnValue = wrapperFunctionApplier.apply(wrapper); + // assert that the expected delegation to the mock actually occurred, including any return value transformation + assertTrue(persistentCollectionMethodInvokedCorrectly); + Object transformedMockFunctionReturnValue = mockFunctionReturnValueTransformation.apply(mockFunctionReturnValue); + if (this.expectWrapperToWrapMockFunctionReturnValue) { + assertEquals(transformedMockFunctionReturnValue, unwrap((W) wrapperReturnValue)); + } else { + assertEquals(transformedMockFunctionReturnValue, wrapperReturnValue); + } + } + + private static void throwExceptionForIllegalTestSetup() { + throw new IllegalStateException( + "test setup error: must define both mock and wrapper consumers or both mock and wrapper functions"); + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java new file mode 100644 index 00000000000..ab32b32be72 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable.pcollections; + +import org.apache.kafka.server.immutable.DelegationChecker; +import org.apache.kafka.server.immutable.ImmutableMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.pcollections.HashPMap; +import org.pcollections.HashTreePMap; + +import java.util.Collections; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static java.util.function.Function.identity; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; + +@SuppressWarnings({"unchecked", "deprecation"}) +public class PCollectionsImmutableMapTest { + private static final HashPMap<Object, Object> SINGLETON_MAP = HashTreePMap.singleton(new Object(), new Object()); + + private static final class PCollectionsHashMapWrapperDelegationChecker<R> extends DelegationChecker<HashPMap<Object, Object>, PCollectionsImmutableMap<Object, Object>, R> { + public PCollectionsHashMapWrapperDelegationChecker() { + super(mock(HashPMap.class), PCollectionsImmutableMap::new); + } + + public HashPMap<Object, Object> unwrap(PCollectionsImmutableMap<Object, Object> wrapper) { + return wrapper.underlying(); + } + } + + @Test + public void testEmptyMap() { + Assertions.assertEquals(HashTreePMap.empty(), ((PCollectionsImmutableMap<?, ?>) ImmutableMap.empty()).underlying()); + } + + @Test + public void testSingletonMap() { + Assertions.assertEquals(HashTreePMap.singleton(1, 2), ((PCollectionsImmutableMap<?, ?>) ImmutableMap.singleton(1, 2)).underlying()); + } + + @Test + public void testUnderlying() { + assertSame(SINGLETON_MAP, new PCollectionsImmutableMap<>(SINGLETON_MAP).underlying()); + } + + @Test + public void testDelegationOfAfterAdding() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this), eq(this)), SINGLETON_MAP) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.updated(this, this), identity()) + .expectWrapperToWrapMockFunctionReturnValue() + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfAfterRemoving() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_MAP) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity()) + .expectWrapperToWrapMockFunctionReturnValue() + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2}) + public void testDelegationOfSize(int mockFunctionReturnValue) { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(HashPMap::size, mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::size, identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(HashPMap::isEmpty, mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::isEmpty, identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfContainsKey(boolean mockFunctionReturnValue) { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.containsKey(eq(this)), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsKey(this), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfContainsValue(boolean mockFunctionReturnValue) { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.containsValue(eq(this)), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsValue(this), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfGet() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.get(eq(this)), new Object()) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.get(this), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfPut() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.put(eq(this), eq(this)), this) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.put(this, this), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfRemoveByKey() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), this) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfPutAll() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(mock -> mock.putAll(eq(Collections.emptyMap()))) + .defineWrapperVoidMethodInvocation(wrapper -> wrapper.putAll(Collections.emptyMap())) + .doVoidMethodDelegationCheck(); + } + + @Test + public void testDelegationOfClear() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(HashPMap::clear) + .defineWrapperVoidMethodInvocation(PCollectionsImmutableMap::clear) + .doVoidMethodDelegationCheck(); + } + + + @Test + public void testDelegationOfKeySet() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(HashPMap::keySet, Collections.emptySet()) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::keySet, identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfValues() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(HashPMap::values, Collections.emptySet()) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::values, identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfEntrySet() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(HashPMap::entrySet, Collections.emptySet()) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::entrySet, identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testEquals() { + final HashPMap<Object, Object> mock = mock(HashPMap.class); + assertEquals(new PCollectionsImmutableMap<>(mock), new PCollectionsImmutableMap<>(mock)); + final HashPMap<Object, Object> someOtherMock = mock(HashPMap.class); + assertNotEquals(new PCollectionsImmutableMap<>(mock), new PCollectionsImmutableMap<>(someOtherMock)); + } + + @Test + public void testHashCode() { + final HashPMap<Object, Object> mock = mock(HashPMap.class); + assertEquals(mock.hashCode(), new PCollectionsImmutableMap<>(mock).hashCode()); + final HashPMap<Object, Object> someOtherMock = mock(HashPMap.class); + assertNotEquals(mock.hashCode(), new PCollectionsImmutableMap<>(someOtherMock).hashCode()); + } + + @Test + public void testDelegationOfGetOrDefault() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.getOrDefault(eq(this), eq(this)), this) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.getOrDefault(this, this), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfForEach() { + final BiConsumer<Object, Object> mockBiConsumer = mock(BiConsumer.class); + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockBiConsumer))) + .defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockBiConsumer)) + .doVoidMethodDelegationCheck(); + } + + @Test + public void testDelegationOfReplaceAll() { + final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class); + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction))) + .defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction)) + .doVoidMethodDelegationCheck(); + } + + @Test + public void testDelegationOfPutIfAbsent() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfReplaceWhenMappedToAnyValue() { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this)), this) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfComputeIfAbsent() { + final Function<Object, Object> mockFunction = mock(Function.class); + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfAbsent(eq(this), eq(mockFunction))) + .defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfAbsent(this, mockFunction)) + .doVoidMethodDelegationCheck(); + } + + @Test + public void testDelegationOfComputeIfPresent() { + final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class); + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfPresent(eq(this), eq(mockBiFunction))) + .defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfPresent(this, mockBiFunction)) + .doVoidMethodDelegationCheck(); + } + + @Test + public void testDelegationOfCompute() { + final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class); + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(mock -> mock.compute(eq(this), eq(mockBiFunction))) + .defineWrapperVoidMethodInvocation(wrapper -> wrapper.compute(this, mockBiFunction)) + .doVoidMethodDelegationCheck(); + } + + @Test + public void testDelegationOfMerge() { + final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class); + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(mock -> mock.merge(eq(this), eq(this), eq(mockBiFunction))) + .defineWrapperVoidMethodInvocation(wrapper -> wrapper.merge(this, this, mockBiFunction)) + .doVoidMethodDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(strings = {"a", "b"}) + public void testDelegationOfToString(String mockFunctionReturnValue) { + new PCollectionsHashMapWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(HashPMap::toString, mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::toString, + text -> "PCollectionsImmutableMap{underlying=" + text + "}") + .doFunctionDelegationCheck(); + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java new file mode 100644 index 00000000000..457488404f9 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable.pcollections; + +import org.apache.kafka.server.immutable.DelegationChecker; +import org.apache.kafka.server.immutable.ImmutableSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.pcollections.HashTreePSet; +import org.pcollections.MapPSet; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import static java.util.function.Function.identity; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; + +@SuppressWarnings({"unchecked", "deprecation"}) +public class PCollectionsImmutableSetTest { + + private static final MapPSet<Object> SINGLETON_SET = HashTreePSet.singleton(new Object()); + + private static final class PCollectionsHashSetWrapperDelegationChecker<R> extends DelegationChecker<MapPSet<Object>, PCollectionsImmutableSet<Object>, R> { + public PCollectionsHashSetWrapperDelegationChecker() { + super(mock(MapPSet.class), PCollectionsImmutableSet::new); + } + + public MapPSet<Object> unwrap(PCollectionsImmutableSet<Object> wrapper) { + return wrapper.underlying(); + } + } + + @Test + public void testEmptySet() { + Assertions.assertEquals(HashTreePSet.empty(), ((PCollectionsImmutableSet<?>) ImmutableSet.empty()).underlying()); + } + + @Test + public void testSingletonSet() { + Assertions.assertEquals(HashTreePSet.singleton(1), ((PCollectionsImmutableSet<?>) ImmutableSet.singleton(1)).underlying()); + } + + @Test + public void testUnderlying() { + assertSame(SINGLETON_SET, new PCollectionsImmutableSet<>(SINGLETON_SET).underlying()); + } + + @Test + public void testDelegationOfAfterAdding() { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this)), SINGLETON_SET) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.added(this), identity()) + .expectWrapperToWrapMockFunctionReturnValue() + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfAfterRemoving() { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_SET) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity()) + .expectWrapperToWrapMockFunctionReturnValue() + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2}) + public void testDelegationOfSize(int mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(MapPSet::size, mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::size, identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(MapPSet::isEmpty, mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::isEmpty, identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfContains(boolean mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.contains(eq(this)), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.contains(this), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfIterator() { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(MapPSet::iterator, mock(Iterator.class)) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::iterator, identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfForEach() { + final Consumer<Object> mockConsumer = mock(Consumer.class); + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockConsumer))) + .defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockConsumer)) + .doVoidMethodDelegationCheck(); + } + + @Test + public void testDelegationOfToArray() { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(MapPSet::toArray, new Object[0]) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::toArray, identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfToArrayIntoGivenDestination() { + Object[] destinationArray = new Object[0]; + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.toArray(eq(destinationArray)), new Object[0]) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.toArray(destinationArray), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfAdd(boolean mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.add(eq(this)), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.add(this), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfRemove(boolean mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfContainsAll(boolean mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.containsAll(eq(Collections.emptyList())), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsAll(Collections.emptyList()), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfAddAll(boolean mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.addAll(eq(Collections.emptyList())), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.addAll(Collections.emptyList()), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfRetainAll(boolean mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.retainAll(eq(Collections.emptyList())), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.retainAll(Collections.emptyList()), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfRemoveAll(boolean mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.removeAll(eq(Collections.emptyList())), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeAll(Collections.emptyList()), identity()) + .doFunctionDelegationCheck(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDelegationOfRemoveIf(boolean mockFunctionReturnValue) { + final Predicate<Object> mockPredicate = mock(Predicate.class); + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(mock -> mock.removeIf(eq(mockPredicate)), mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeIf(mockPredicate), identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfClear() { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForVoidMethodInvocation(MapPSet::clear) + .defineWrapperVoidMethodInvocation(PCollectionsImmutableSet::clear) + .doVoidMethodDelegationCheck(); + } + + @Test + public void testDelegationOfSpliterator() { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(MapPSet::spliterator, mock(Spliterator.class)) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::spliterator, identity()) + .doFunctionDelegationCheck(); + } + + + @Test + public void testDelegationOfStream() { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(MapPSet::stream, mock(Stream.class)) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::stream, identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testDelegationOfParallelStream() { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(MapPSet::parallelStream, mock(Stream.class)) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::parallelStream, identity()) + .doFunctionDelegationCheck(); + } + + @Test + public void testEquals() { + final MapPSet<Object> mock = mock(MapPSet.class); + assertEquals(new PCollectionsImmutableSet<>(mock), new PCollectionsImmutableSet<>(mock)); + final MapPSet<Object> someOtherMock = mock(MapPSet.class); + assertNotEquals(new PCollectionsImmutableSet<>(mock), new PCollectionsImmutableSet<>(someOtherMock)); + } + + @Test + public void testHashCode() { + final MapPSet<Object> mock = mock(MapPSet.class); + assertEquals(mock.hashCode(), new PCollectionsImmutableSet<>(mock).hashCode()); + final MapPSet<Object> someOtherMock = mock(MapPSet.class); + assertNotEquals(mock.hashCode(), new PCollectionsImmutableSet<>(someOtherMock).hashCode()); + } + + @ParameterizedTest + @ValueSource(strings = {"a", "b"}) + public void testDelegationOfToString(String mockFunctionReturnValue) { + new PCollectionsHashSetWrapperDelegationChecker<>() + .defineMockConfigurationForFunctionInvocation(MapPSet::toString, mockFunctionReturnValue) + .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::toString, + text -> "PCollectionsImmutableSet{underlying=" + text + "}") + .doFunctionDelegationCheck(); + } +}