[ 
https://issues.apache.org/jira/browse/KAFKA-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16650131#comment-16650131
 ] 

ASF GitHub Bot commented on KAFKA-7498:
---------------------------------------

rajinisivaram closed pull request #5784: KAFKA-7498: Remove references from 
`common.requests` to `clients`
URL: https://github.com/apache/kafka/pull/5784
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7810a3e8673..91d23f6424b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -131,7 +131,6 @@
     </subpackage>
 
     <subpackage name="requests">
-      <allow pkg="org.apache.kafka.clients.admin" />
       <allow pkg="org.apache.kafka.common.acl" />
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.network" />
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 86e14476625..b4132840e4d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -81,6 +81,7 @@
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
@@ -142,6 +143,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
@@ -2083,7 +2085,8 @@ public CreatePartitionsResult 
createPartitions(Map<String, NewPartitions> newPar
         for (String topic : newPartitions.keySet()) {
             futures.put(topic, new KafkaFutureImpl<>());
         }
-        final Map<String, NewPartitions> requestMap = new 
HashMap<>(newPartitions);
+        final Map<String, PartitionDetails> requestMap = 
newPartitions.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
partitionDetails(e.getValue())));
 
         final long now = time.milliseconds();
         runnable.call(new Call("createPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
@@ -2482,6 +2485,10 @@ private boolean 
handleFindCoordinatorError(FindCoordinatorResponse response, Kaf
         return false;
     }
 
+    private PartitionDetails partitionDetails(NewPartitions newPartitions) {
+        return new PartitionDetails(newPartitions.totalCount(), 
newPartitions.assignments());
+    }
+
     private final static class ListConsumerGroupsResults {
         private final List<Throwable> errors;
         private final HashMap<String, ConsumerGroupListing> listings;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
index 795a66a9ea6..7872cf9f8d0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -72,17 +71,47 @@
     // It is an error for duplicate topics to be present in the request,
     // so track duplicates here to allow KafkaApis to report per-topic errors.
     private final Set<String> duplicates;
-    private final Map<String, NewPartitions> newPartitions;
+    private final Map<String, PartitionDetails> newPartitions;
     private final int timeout;
     private final boolean validateOnly;
 
+    public static class PartitionDetails {
+
+        private final int totalCount;
+
+        private final List<List<Integer>> newAssignments;
+
+        public PartitionDetails(int totalCount) {
+            this(totalCount, null);
+        }
+
+        public PartitionDetails(int totalCount, List<List<Integer>> 
newAssignments) {
+            this.totalCount = totalCount;
+            this.newAssignments = newAssignments;
+        }
+
+        public int totalCount() {
+            return totalCount;
+        }
+
+        public List<List<Integer>> newAssignments() {
+            return newAssignments;
+        }
+
+        @Override
+        public String toString() {
+            return "(totalCount=" + totalCount() + ", newAssignments=" + 
newAssignments() + ")";
+        }
+
+    }
+
     public static class Builder extends 
AbstractRequest.Builder<CreatePartitionsRequest> {
 
-        private final Map<String, NewPartitions> newPartitions;
+        private final Map<String, PartitionDetails> newPartitions;
         private final int timeout;
         private final boolean validateOnly;
 
-        public Builder(Map<String, NewPartitions> newPartitions, int timeout, 
boolean validateOnly) {
+        public Builder(Map<String, PartitionDetails> newPartitions, int 
timeout, boolean validateOnly) {
             super(ApiKeys.CREATE_PARTITIONS);
             this.newPartitions = newPartitions;
             this.timeout = timeout;
@@ -106,7 +135,7 @@ public String toString() {
         }
     }
 
-    CreatePartitionsRequest(Map<String, NewPartitions> newPartitions, int 
timeout, boolean validateOnly, short apiVersion) {
+    CreatePartitionsRequest(Map<String, PartitionDetails> newPartitions, int 
timeout, boolean validateOnly, short apiVersion) {
         super(ApiKeys.CREATE_PARTITIONS, apiVersion);
         this.newPartitions = newPartitions;
         this.duplicates = Collections.emptySet();
@@ -117,7 +146,7 @@ public String toString() {
     public CreatePartitionsRequest(Struct struct, short apiVersion) {
         super(ApiKeys.CREATE_PARTITIONS, apiVersion);
         Object[] topicCountArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
-        Map<String, NewPartitions> counts = new 
HashMap<>(topicCountArray.length);
+        Map<String, PartitionDetails> counts = new 
HashMap<>(topicCountArray.length);
         Set<String> dupes = new HashSet<>();
         for (Object topicPartitionCountObj : topicCountArray) {
             Struct topicPartitionCountStruct = (Struct) topicPartitionCountObj;
@@ -125,7 +154,7 @@ public CreatePartitionsRequest(Struct struct, short 
apiVersion) {
             Struct partitionCountStruct = 
topicPartitionCountStruct.getStruct(NEW_PARTITIONS_KEY_NAME);
             int count = partitionCountStruct.getInt(COUNT_KEY_NAME);
             Object[] assignmentsArray = 
partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME);
-            NewPartitions newPartition;
+            PartitionDetails newPartition;
             if (assignmentsArray != null) {
                 List<List<Integer>> assignments = new 
ArrayList<>(assignmentsArray.length);
                 for (Object replicas : assignmentsArray) {
@@ -136,11 +165,11 @@ public CreatePartitionsRequest(Struct struct, short 
apiVersion) {
                         replicasList.add((Integer) broker);
                     }
                 }
-                newPartition = NewPartitions.increaseTo(count, assignments);
+                newPartition = new PartitionDetails(count, assignments);
             } else {
-                newPartition = NewPartitions.increaseTo(count);
+                newPartition = new PartitionDetails(count);
             }
-            NewPartitions dupe = counts.put(topic, newPartition);
+            PartitionDetails dupe = counts.put(topic, newPartition);
             if (dupe != null) {
                 dupes.add(topic);
             }
@@ -155,7 +184,7 @@ public CreatePartitionsRequest(Struct struct, short 
apiVersion) {
         return duplicates;
     }
 
-    public Map<String, NewPartitions> newPartitions() {
+    public Map<String, PartitionDetails> newPartitions() {
         return newPartitions;
     }
 
@@ -171,17 +200,17 @@ public boolean validateOnly() {
     protected Struct toStruct() {
         Struct struct = new 
Struct(ApiKeys.CREATE_PARTITIONS.requestSchema(version()));
         List<Struct> topicPartitionsList = new ArrayList<>();
-        for (Map.Entry<String, NewPartitions> topicPartitionCount : 
this.newPartitions.entrySet()) {
+        for (Map.Entry<String, PartitionDetails> topicPartitionCount : 
this.newPartitions.entrySet()) {
             Struct topicPartitionCountStruct = 
struct.instance(TOPIC_PARTITIONS_KEY_NAME);
             topicPartitionCountStruct.set(TOPIC_NAME, 
topicPartitionCount.getKey());
-            NewPartitions count = topicPartitionCount.getValue();
+            PartitionDetails partitionDetails = topicPartitionCount.getValue();
             Struct partitionCountStruct = 
topicPartitionCountStruct.instance(NEW_PARTITIONS_KEY_NAME);
-            partitionCountStruct.set(COUNT_KEY_NAME, count.totalCount());
+            partitionCountStruct.set(COUNT_KEY_NAME, 
partitionDetails.totalCount());
             Object[][] assignments = null;
-            if (count.assignments() != null) {
-                assignments = new Object[count.assignments().size()][];
+            if (partitionDetails.newAssignments() != null) {
+                assignments = new 
Object[partitionDetails.newAssignments().size()][];
                 int i = 0;
-                for (List<Integer> partitionAssignment : count.assignments()) {
+                for (List<Integer> partitionAssignment : 
partitionDetails.newAssignments()) {
                     assignments[i] = partitionAssignment.toArray(new 
Object[0]);
                     i++;
                 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e34348a6da3..b7f39ef9d67 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
@@ -44,6 +43,7 @@
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.resource.PatternType;
@@ -1214,16 +1214,16 @@ private AlterConfigsResponse 
createAlterConfigsResponse() {
     }
 
     private CreatePartitionsRequest createCreatePartitionsRequest() {
-        Map<String, NewPartitions> assignments = new HashMap<>();
-        assignments.put("my_topic", NewPartitions.increaseTo(3));
-        assignments.put("my_other_topic", NewPartitions.increaseTo(3));
+        Map<String, PartitionDetails> assignments = new HashMap<>();
+        assignments.put("my_topic", new PartitionDetails(3));
+        assignments.put("my_other_topic", new PartitionDetails(3));
         return new CreatePartitionsRequest(assignments, 0, false, (short) 0);
     }
 
     private CreatePartitionsRequest 
createCreatePartitionsRequestWithAssignments() {
-        Map<String, NewPartitions> assignments = new HashMap<>();
-        assignments.put("my_topic", NewPartitions.increaseTo(3, 
asList(asList(2))));
-        assignments.put("my_other_topic", NewPartitions.increaseTo(3, 
asList(asList(2, 3), asList(3, 1))));
+        Map<String, PartitionDetails> assignments = new HashMap<>();
+        assignments.put("my_topic", new PartitionDetails(3, 
asList(asList(2))));
+        assignments.put("my_other_topic", new PartitionDetails(3, 
asList(asList(2, 3), asList(3, 1))));
         return new CreatePartitionsRequest(assignments, 0, false, (short) 0);
     }
 
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala 
b/core/src/main/scala/kafka/server/AdminManager.scala
index 2b481702c45..4ec083a5f14 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -24,13 +24,13 @@ import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
-import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.{ApiException, 
InvalidPartitionsException, InvalidReplicaAssignmentException, 
InvalidRequestException, ReassignmentInProgressException, 
UnknownTopicOrPartitionException, InvalidConfigurationException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
+import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails
 import org.apache.kafka.common.requests.CreateTopicsRequest._
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, 
DescribeConfigsResponse}
@@ -204,7 +204,7 @@ class AdminManager(val config: KafkaConfig,
   }
 
   def createPartitions(timeout: Int,
-                       newPartitions: Map[String, NewPartitions],
+                       newPartitions: Map[String, PartitionDetails],
                        validateOnly: Boolean,
                        listenerName: ListenerName,
                        callback: Map[String, ApiError] => Unit): Unit = {
@@ -237,7 +237,7 @@ class AdminManager(val config: KafkaConfig,
           throw new InvalidPartitionsException(s"Topic already has 
$oldNumPartitions partitions.")
         }
 
-        val reassignment = 
Option(newPartition.assignments).map(_.asScala.map(_.asScala.map(_.toInt))).map 
{ assignments =>
+        val reassignment = 
Option(newPartition.newAssignments).map(_.asScala.map(_.asScala.map(_.toInt))).map
 { assignments =>
           val unknownBrokers = assignments.flatten.toSet -- allBrokerIds
           if (unknownBrokers.nonEmpty)
             throw new InvalidReplicaAssignmentException(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index ecbbdb6f03f..6a99db31a14 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,8 +31,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
-import kafka.log.{Log, LogManager, TimestampOffset}
-import kafka.message.{CompressionCodec, NoCompressionCodec, 
ZStdCompressionCodec}
+import kafka.message.ZStdCompressionCodec
 import kafka.network.RequestChannel
 import kafka.security.SecurityUtils
 import kafka.security.auth.{Resource, _}
@@ -49,6 +48,7 @@ import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, 
EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, 
MultiRecordsSend, RecordBatch, RecordConversionStats, Records}
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, 
AclFilterResponse}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -64,7 +64,6 @@ import scala.collection.JavaConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
-import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 
 /**
  * Logic to handle the various Kafka requests
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ae6273623ae..a2f10498666 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -324,7 +324,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createPartitionsRequest = {
     new CreatePartitionsRequest.Builder(
-      Map(topic -> NewPartitions.increaseTo(10)).asJava, 10000, true
+      Map(topic -> new CreatePartitionsRequest.PartitionDetails(10)).asJava, 
10000, true
     ).build()
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3604385f6ad..f2d3b4a1cad 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -22,7 +22,6 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, 
AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType => AdminResourceType}
@@ -342,7 +341,7 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.CREATE_PARTITIONS =>
           new CreatePartitionsRequest.Builder(
-            Collections.singletonMap("topic-2", NewPartitions.increaseTo(1)), 
0, false
+            Collections.singletonMap("topic-2", new 
CreatePartitionsRequest.PartitionDetails(1)), 0, false
           )
 
         case ApiKeys.CREATE_DELEGATION_TOKEN =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> common.requests.CreatePartitionsRequest uses clients.admin.NewPartitions
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-7498
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7498
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 2.1.0
>
>
> `org.apache.kafka.common.requests.CreatePartitionsRequest` currently uses 
> `org.apache.kafka.clients.admin.NewPartitions`. We shouldn't have references 
> from `common` to `clients`. Since `org.apache.kafka.clients.admin` is a 
> public package, we cannot use a common class for Admin API and requests. So 
> we should do something similar to CreateTopicsRequest for which we have 
> `org.apache.kafka.clients.admin.NewTopic` class used for the admin API and an 
> equivalent 
> `org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails` class 
> that doesn't refer to `clients.admin`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to