[ https://issues.apache.org/jira/browse/KAFKA-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16650131#comment-16650131 ]
ASF GitHub Bot commented on KAFKA-7498: --------------------------------------- rajinisivaram closed pull request #5784: KAFKA-7498: Remove references from `common.requests` to `clients` URL: https://github.com/apache/kafka/pull/5784 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7810a3e8673..91d23f6424b 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -131,7 +131,6 @@ </subpackage> <subpackage name="requests"> - <allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="org.apache.kafka.common.acl" /> <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.network" /> diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 86e14476625..b4132840e4d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -81,6 +81,7 @@ import org.apache.kafka.common.requests.CreateDelegationTokenRequest; import org.apache.kafka.common.requests.CreateDelegationTokenResponse; import org.apache.kafka.common.requests.CreatePartitionsRequest; +import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; @@ -142,6 +143,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -2083,7 +2085,8 @@ public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPar for (String topic : newPartitions.keySet()) { futures.put(topic, new KafkaFutureImpl<>()); } - final Map<String, NewPartitions> requestMap = new HashMap<>(newPartitions); + final Map<String, PartitionDetails> requestMap = newPartitions.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> partitionDetails(e.getValue()))); final long now = time.milliseconds(); runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.timeoutMs()), @@ -2482,6 +2485,10 @@ private boolean handleFindCoordinatorError(FindCoordinatorResponse response, Kaf return false; } + private PartitionDetails partitionDetails(NewPartitions newPartitions) { + return new PartitionDetails(newPartitions.totalCount(), newPartitions.assignments()); + } + private final static class ListConsumerGroupsResults { private final List<Throwable> errors; private final HashMap<String, ConsumerGroupListing> listings; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java index 795a66a9ea6..7872cf9f8d0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; @@ -72,17 +71,47 @@ // It is an error for duplicate topics to be present in the request, // so track duplicates here to allow KafkaApis to report per-topic errors. private final Set<String> duplicates; - private final Map<String, NewPartitions> newPartitions; + private final Map<String, PartitionDetails> newPartitions; private final int timeout; private final boolean validateOnly; + public static class PartitionDetails { + + private final int totalCount; + + private final List<List<Integer>> newAssignments; + + public PartitionDetails(int totalCount) { + this(totalCount, null); + } + + public PartitionDetails(int totalCount, List<List<Integer>> newAssignments) { + this.totalCount = totalCount; + this.newAssignments = newAssignments; + } + + public int totalCount() { + return totalCount; + } + + public List<List<Integer>> newAssignments() { + return newAssignments; + } + + @Override + public String toString() { + return "(totalCount=" + totalCount() + ", newAssignments=" + newAssignments() + ")"; + } + + } + public static class Builder extends AbstractRequest.Builder<CreatePartitionsRequest> { - private final Map<String, NewPartitions> newPartitions; + private final Map<String, PartitionDetails> newPartitions; private final int timeout; private final boolean validateOnly; - public Builder(Map<String, NewPartitions> newPartitions, int timeout, boolean validateOnly) { + public Builder(Map<String, PartitionDetails> newPartitions, int timeout, boolean validateOnly) { super(ApiKeys.CREATE_PARTITIONS); this.newPartitions = newPartitions; this.timeout = timeout; @@ -106,7 +135,7 @@ public String toString() { } } - CreatePartitionsRequest(Map<String, NewPartitions> newPartitions, int timeout, boolean validateOnly, short apiVersion) { + CreatePartitionsRequest(Map<String, PartitionDetails> newPartitions, int timeout, boolean validateOnly, short apiVersion) { super(ApiKeys.CREATE_PARTITIONS, apiVersion); this.newPartitions = newPartitions; this.duplicates = Collections.emptySet(); @@ -117,7 +146,7 @@ public String toString() { public CreatePartitionsRequest(Struct struct, short apiVersion) { super(ApiKeys.CREATE_PARTITIONS, apiVersion); Object[] topicCountArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME); - Map<String, NewPartitions> counts = new HashMap<>(topicCountArray.length); + Map<String, PartitionDetails> counts = new HashMap<>(topicCountArray.length); Set<String> dupes = new HashSet<>(); for (Object topicPartitionCountObj : topicCountArray) { Struct topicPartitionCountStruct = (Struct) topicPartitionCountObj; @@ -125,7 +154,7 @@ public CreatePartitionsRequest(Struct struct, short apiVersion) { Struct partitionCountStruct = topicPartitionCountStruct.getStruct(NEW_PARTITIONS_KEY_NAME); int count = partitionCountStruct.getInt(COUNT_KEY_NAME); Object[] assignmentsArray = partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME); - NewPartitions newPartition; + PartitionDetails newPartition; if (assignmentsArray != null) { List<List<Integer>> assignments = new ArrayList<>(assignmentsArray.length); for (Object replicas : assignmentsArray) { @@ -136,11 +165,11 @@ public CreatePartitionsRequest(Struct struct, short apiVersion) { replicasList.add((Integer) broker); } } - newPartition = NewPartitions.increaseTo(count, assignments); + newPartition = new PartitionDetails(count, assignments); } else { - newPartition = NewPartitions.increaseTo(count); + newPartition = new PartitionDetails(count); } - NewPartitions dupe = counts.put(topic, newPartition); + PartitionDetails dupe = counts.put(topic, newPartition); if (dupe != null) { dupes.add(topic); } @@ -155,7 +184,7 @@ public CreatePartitionsRequest(Struct struct, short apiVersion) { return duplicates; } - public Map<String, NewPartitions> newPartitions() { + public Map<String, PartitionDetails> newPartitions() { return newPartitions; } @@ -171,17 +200,17 @@ public boolean validateOnly() { protected Struct toStruct() { Struct struct = new Struct(ApiKeys.CREATE_PARTITIONS.requestSchema(version())); List<Struct> topicPartitionsList = new ArrayList<>(); - for (Map.Entry<String, NewPartitions> topicPartitionCount : this.newPartitions.entrySet()) { + for (Map.Entry<String, PartitionDetails> topicPartitionCount : this.newPartitions.entrySet()) { Struct topicPartitionCountStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME); topicPartitionCountStruct.set(TOPIC_NAME, topicPartitionCount.getKey()); - NewPartitions count = topicPartitionCount.getValue(); + PartitionDetails partitionDetails = topicPartitionCount.getValue(); Struct partitionCountStruct = topicPartitionCountStruct.instance(NEW_PARTITIONS_KEY_NAME); - partitionCountStruct.set(COUNT_KEY_NAME, count.totalCount()); + partitionCountStruct.set(COUNT_KEY_NAME, partitionDetails.totalCount()); Object[][] assignments = null; - if (count.assignments() != null) { - assignments = new Object[count.assignments().size()][]; + if (partitionDetails.newAssignments() != null) { + assignments = new Object[partitionDetails.newAssignments().size()][]; int i = 0; - for (List<Integer> partitionAssignment : count.assignments()) { + for (List<Integer> partitionAssignment : partitionDetails.newAssignments()) { assignments[i] = partitionAssignment.toArray(new Object[0]); i++; } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e34348a6da3..b7f39ef9d67 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; @@ -44,6 +43,7 @@ import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; +import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; import org.apache.kafka.common.resource.PatternType; @@ -1214,16 +1214,16 @@ private AlterConfigsResponse createAlterConfigsResponse() { } private CreatePartitionsRequest createCreatePartitionsRequest() { - Map<String, NewPartitions> assignments = new HashMap<>(); - assignments.put("my_topic", NewPartitions.increaseTo(3)); - assignments.put("my_other_topic", NewPartitions.increaseTo(3)); + Map<String, PartitionDetails> assignments = new HashMap<>(); + assignments.put("my_topic", new PartitionDetails(3)); + assignments.put("my_other_topic", new PartitionDetails(3)); return new CreatePartitionsRequest(assignments, 0, false, (short) 0); } private CreatePartitionsRequest createCreatePartitionsRequestWithAssignments() { - Map<String, NewPartitions> assignments = new HashMap<>(); - assignments.put("my_topic", NewPartitions.increaseTo(3, asList(asList(2)))); - assignments.put("my_other_topic", NewPartitions.increaseTo(3, asList(asList(2, 3), asList(3, 1)))); + Map<String, PartitionDetails> assignments = new HashMap<>(); + assignments.put("my_topic", new PartitionDetails(3, asList(asList(2)))); + assignments.put("my_other_topic", new PartitionDetails(3, asList(asList(2, 3), asList(3, 1)))); return new CreatePartitionsRequest(assignments, 0, false, (short) 0); } diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 2b481702c45..4ec083a5f14 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -24,13 +24,13 @@ import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} -import org.apache.kafka.clients.admin.NewPartitions import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource} import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException, InvalidConfigurationException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails import org.apache.kafka.common.requests.CreateTopicsRequest._ import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse} @@ -204,7 +204,7 @@ class AdminManager(val config: KafkaConfig, } def createPartitions(timeout: Int, - newPartitions: Map[String, NewPartitions], + newPartitions: Map[String, PartitionDetails], validateOnly: Boolean, listenerName: ListenerName, callback: Map[String, ApiError] => Unit): Unit = { @@ -237,7 +237,7 @@ class AdminManager(val config: KafkaConfig, throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.") } - val reassignment = Option(newPartition.assignments).map(_.asScala.map(_.asScala.map(_.toInt))).map { assignments => + val reassignment = Option(newPartition.newAssignments).map(_.asScala.map(_.asScala.map(_.toInt))).map { assignments => val unknownBrokers = assignments.flatten.toSet -- allBrokerIds if (unknownBrokers.nonEmpty) throw new InvalidReplicaAssignmentException( diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ecbbdb6f03f..6a99db31a14 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -31,8 +31,7 @@ import kafka.common.OffsetAndMetadata import kafka.controller.KafkaController import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} -import kafka.log.{Log, LogManager, TimestampOffset} -import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} +import kafka.message.ZStdCompressionCodec import kafka.network.RequestChannel import kafka.security.SecurityUtils import kafka.security.auth.{Resource, _} @@ -49,6 +48,7 @@ import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, MultiRecordsSend, RecordBatch, RecordConversionStats, Records} import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse +import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -64,7 +64,6 @@ import scala.collection.JavaConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} -import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails /** * Logic to handle the various Kafka requests diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index ae6273623ae..a2f10498666 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -324,7 +324,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createPartitionsRequest = { new CreatePartitionsRequest.Builder( - Map(topic -> NewPartitions.increaseTo(10)).asJava, 10000, true + Map(topic -> new CreatePartitionsRequest.PartitionDetails(10)).asJava, 10000, true ).build() } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 3604385f6ad..f2d3b4a1cad 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -22,7 +22,6 @@ import kafka.log.LogConfig import kafka.network.RequestChannel.Session import kafka.security.auth._ import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.NewPartitions import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} @@ -342,7 +341,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsRequest.Builder( - Collections.singletonMap("topic-2", NewPartitions.increaseTo(1)), 0, false + Collections.singletonMap("topic-2", new CreatePartitionsRequest.PartitionDetails(1)), 0, false ) case ApiKeys.CREATE_DELEGATION_TOKEN => ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > common.requests.CreatePartitionsRequest uses clients.admin.NewPartitions > ------------------------------------------------------------------------ > > Key: KAFKA-7498 > URL: https://issues.apache.org/jira/browse/KAFKA-7498 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 2.1.0 > > > `org.apache.kafka.common.requests.CreatePartitionsRequest` currently uses > `org.apache.kafka.clients.admin.NewPartitions`. We shouldn't have references > from `common` to `clients`. Since `org.apache.kafka.clients.admin` is a > public package, we cannot use a common class for Admin API and requests. So > we should do something similar to CreateTopicsRequest for which we have > `org.apache.kafka.clients.admin.NewTopic` class used for the admin API and an > equivalent > `org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails` class > that doesn't refer to `clients.admin`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)