This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fbac3c   KAFKA-8179: PartitionAssignorAdapter (#7110)
6fbac3c is described below

commit 6fbac3cfa88bd3eba36b2d3e254d7ce5e11a550b
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Wed Jul 31 13:53:38 2019 -0700

     KAFKA-8179: PartitionAssignorAdapter (#7110)
    
    Follow up to new PartitionAssignor interface merged in 7108 is merged
    
    Adds a PartitionAssignorAdapter class to maintain backwards compatibility
    
    Reviewers: Boyang Chen <boy...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>, Guozhang Wang <wangg...@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   4 +-
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +-
 .../consumer/ConsumerPartitionAssignor.java        |  10 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   7 +-
 .../internals/AbstractPartitionAssignor.java       |   4 +-
 .../consumer/internals/ConsumerCoordinator.java    |   3 +-
 .../consumer/internals/PartitionAssignor.java      |   3 +
 .../internals/PartitionAssignorAdapter.java        | 136 ++++++++++++++++
 .../internals/PartitionAssignorAdapterTest.java    | 173 +++++++++++++++++++++
 docs/upgrade.html                                  |   2 +
 .../internals/StreamsPartitionAssignor.java        |   4 +-
 11 files changed, 333 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f2ee21e..227a03b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -29,7 +29,7 @@ import 
org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
 import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
-import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.Cluster;
@@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient {
                     for (DescribedGroupMember groupMember : members) {
                         Set<TopicPartition> partitions = 
Collections.emptySet();
                         if (groupMember.memberAssignment().length > 0) {
-                            final ConsumerPartitionAssignor.Assignment 
assignment = ConsumerProtocol.
+                            final Assignment assignment = ConsumerProtocol.
                                 
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
                             partitions = new 
HashSet<>(assignment.partitions());
                         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 8a18bd5..2e4507a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>partition.assignment.strategy</code>
      */
     public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = 
"partition.assignment.strategy";
-    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class 
name or class type of the assignor implementing the partition assignment 
strategy that the client will use to distribute partition ownership amongst 
consumer instances when group management is used. A custom assignor that 
implements ConsumerPartitionAssignor can be plugged in";
+    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of 
class names or class types, ordered by preference, of supported assignors 
responsible for the partition assignment strategy that the client will use to 
distribute partition ownership amongst consumer instances when group management 
is used. Implementing the 
<code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> 
interface allows you to plug in a custom assignment strategy.";
 
     /**
      * <code>auto.offset.reset</code>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 72d5d6e..07e153e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -44,7 +44,9 @@ public interface ConsumerPartitionAssignor {
      * Return serialized data that will be included in the {@link 
Subscription} sent to the leader
      * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} 
((e.g. local host/rack information)
      *
-     * @return optional join subscription user data
+     * @param topics Topics subscribed to through {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
+     *               and variants
+     * @return nullable subscription user data
      */
     default ByteBuffer subscriptionUserData(Set<String> topics) {
         return null;
@@ -53,11 +55,11 @@ public interface ConsumerPartitionAssignor {
     /**
      * Perform the group assignment given the member subscriptions and current 
cluster metadata.
      * @param metadata Current topic/broker metadata known by consumer
-     * @param subscriptions Subscriptions from all members including metadata 
provided through {@link #subscriptionUserData(Set)}
-     * @return A map from the members to their respective assignment. This 
should have one entry
+     * @param groupSubscription Subscriptions from all members including 
metadata provided through {@link #subscriptionUserData(Set)}
+     * @return A map from the members to their respective assignments. This 
should have one entry
      *         for each member in the input subscription map.
      */
-    GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions);
+    GroupAssignment assign(Cluster metadata, GroupSubscription 
groupSubscription);
 
     /**
      * Callback which is invoked when a group member receives its assignment 
from the leader.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 30944b3..b33b1f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import static 
org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.getAssignorInstances;
+
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
@@ -765,9 +767,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     retryBackoffMs,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     heartbeatIntervalMs); //Will avoid blocking an extended 
period of time to prevent heartbeat thread starvation
-            this.assignors = config.getConfiguredInstances(
-                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-                    ConsumerPartitionAssignor.class);
+
+            this.assignors = 
getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
 config.originals());
 
             // no coordinator will be constructed for the default (null) group 
id
             this.coordinator = groupId == null ? null :
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 3b966b0..ed0282b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -48,8 +48,8 @@ public abstract class AbstractPartitionAssignor implements 
ConsumerPartitionAssi
                                                              Map<String, 
Subscription> subscriptions);
 
     @Override
-    public GroupAssignment assign(Cluster metadata, GroupSubscription 
groupSubscriptions) {
-        Map<String, Subscription> subscriptions = 
groupSubscriptions.groupSubscription();
+    public GroupAssignment assign(Cluster metadata, GroupSubscription 
groupSubscription) {
+        Map<String, Subscription> subscriptions = 
groupSubscription.groupSubscription();
         Set<String> allSubscribedTopics = new HashSet<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet())
             allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a28119d..aae16b9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -204,8 +204,9 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         this.joinedSubscription = subscriptions.subscription();
         JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
 
+        List<String> topics = new ArrayList<>(joinedSubscription);
         for (ConsumerPartitionAssignor assignor : assignors) {
-            Subscription subscription = new Subscription(new 
ArrayList<>(joinedSubscription),
+            Subscription subscription = new Subscription(topics,
                                                          
assignor.subscriptionUserData(joinedSubscription),
                                                          
subscriptions.assignedPartitionsList());
             ByteBuffer metadata = 
ConsumerProtocol.serializeSubscription(subscription);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index b3f2ada..1ecb15c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -36,6 +36,9 @@ import java.util.Set;
  * assignment decisions. For this, you can override {@link #subscription(Set)} 
and provide custom
  * userData in the returned Subscription. For example, to have a rack-aware 
assignor, an implementation
  * can use this user data to forward the rackId belonging to each member.
+ *
+ * This interface has been deprecated in 2.4, custom assignors should now 
implement
+ * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
  */
 @Deprecated
 public interface PartitionAssignor {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java
new file mode 100644
index 0000000..d1d5e20
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This adapter class is used to ensure backwards compatibility for those who 
have implemented the {@link PartitionAssignor}
+ * interface, which has been deprecated in favor of the new {@link 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
+ */
+@SuppressWarnings("deprecation")
+public class PartitionAssignorAdapter implements ConsumerPartitionAssignor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionAssignorAdapter.class);
+    private final PartitionAssignor oldAssignor;
+
+    PartitionAssignorAdapter(PartitionAssignor oldAssignor) {
+        this.oldAssignor = oldAssignor;
+    }
+
+    @Override
+    public ByteBuffer subscriptionUserData(Set<String> topics) {
+        return oldAssignor.subscription(topics).userData();
+    }
+
+    @Override
+    public GroupAssignment assign(Cluster metadata, GroupSubscription 
groupSubscription) {
+        return toNewGroupAssignment(oldAssignor.assign(metadata, 
toOldGroupSubscription(groupSubscription)));
+    }
+
+    @Override
+    public void onAssignment(Assignment assignment, ConsumerGroupMetadata 
metadata) {
+        oldAssignor.onAssignment(toOldAssignment(assignment), 
metadata.generationId());
+    }
+
+    @Override
+    public String name() {
+        return oldAssignor.name();
+    }
+
+    private static PartitionAssignor.Assignment toOldAssignment(Assignment 
newAssignment) {
+        return new PartitionAssignor.Assignment(newAssignment.partitions(), 
newAssignment.userData());
+    }
+
+    private static Map<String, PartitionAssignor.Subscription> 
toOldGroupSubscription(GroupSubscription newSubscriptions) {
+        Map<String, PartitionAssignor.Subscription> oldSubscriptions = new 
HashMap<>();
+        for (Map.Entry<String, Subscription> entry : 
newSubscriptions.groupSubscription().entrySet()) {
+            String member = entry.getKey();
+            Subscription newSubscription = entry.getValue();
+            oldSubscriptions.put(member, new PartitionAssignor.Subscription(
+                newSubscription.topics(), newSubscription.userData()));
+        }
+        return oldSubscriptions;
+    }
+
+    private static GroupAssignment toNewGroupAssignment(Map<String, 
PartitionAssignor.Assignment> oldAssignments) {
+        Map<String, Assignment> newAssignments = new HashMap<>();
+        for (Map.Entry<String, PartitionAssignor.Assignment> entry : 
oldAssignments.entrySet()) {
+            String member = entry.getKey();
+            PartitionAssignor.Assignment oldAssignment = entry.getValue();
+            newAssignments.put(member, new 
Assignment(oldAssignment.partitions(), oldAssignment.userData()));
+        }
+        return new GroupAssignment(newAssignments);
+    }
+
+    /**
+     * Get a list of configured instances of {@link 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
+     * based on the class names/types specified by {@link 
org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
+     * where any instances of the old {@link PartitionAssignor} interface are 
wrapped in an adapter to the new
+     * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor} 
interface
+     */
+    public static List<ConsumerPartitionAssignor> 
getAssignorInstances(List<String> assignorClasses, Map<String, Object> configs) 
{
+        List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
+
+        if (assignorClasses == null)
+            return assignors;
+
+        for (Object klass : assignorClasses) {
+            // first try to get the class if passed in as a string
+            if (klass instanceof String) {
+                try {
+                    klass = Class.forName((String) klass, true, 
Utils.getContextOrKafkaClassLoader());
+                } catch (ClassNotFoundException classNotFound) {
+                    throw new KafkaException(klass + " ClassNotFoundException 
exception occurred", classNotFound);
+                }
+            }
+
+            if (klass instanceof Class<?>) {
+                Object assignor = Utils.newInstance((Class<?>) klass);
+                if (assignor instanceof Configurable)
+                    ((Configurable) assignor).configure(configs);
+
+                if (assignor instanceof ConsumerPartitionAssignor) {
+                    assignors.add((ConsumerPartitionAssignor) assignor);
+                } else if (assignor instanceof PartitionAssignor) {
+                    assignors.add(new 
PartitionAssignorAdapter((PartitionAssignor) assignor));
+                    LOG.warn("The PartitionAssignor interface has been 
deprecated, "
+                        + "please implement the ConsumerPartitionAssignor 
interface instead.");
+                } else {
+                    throw new KafkaException(klass + " is not an instance of " 
+ PartitionAssignor.class.getName()
+                        + " or an instance of " + 
ConsumerPartitionAssignor.class.getName());
+                }
+            } else {
+                throw new KafkaException("List contains element of type " + 
klass.getClass().getName() + ", expected String or Class");
+            }
+        }
+        return assignors;
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java
new file mode 100644
index 0000000..52950cb
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import static 
org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.getAssignorInstances;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.Test;
+
+public class PartitionAssignorAdapterTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateNewAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(StickyAssignor.class.isInstance(assignors.get(0)));
+    }
+
+    @Test
+    public void shouldAdaptOldAssignors() {
+        classNames = Arrays.asList(OldPartitionAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = 
getAssignorInstances(classNames, Collections.emptyMap());
+        
assertTrue(PartitionAssignorAdapter.class.isInstance(assignors.get(0)));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Arrays.asList(String.class.getName());
+        assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Arrays.asList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> 
getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromListOfOldAndNewClassTypes() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+
+        classTypes = Arrays.asList(StickyAssignor.class, 
OldPartitionAssignor.class);
+
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classTypes);
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
+            props, new StringDeserializer(), new StringDeserializer());
+
+        consumer.close();
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnListWithNonAssignorClassType() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+
+        classTypes = Arrays.asList(StickyAssignor.class, 
OldPartitionAssignor.class, String.class);
+
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classTypes);
+        assertThrows(KafkaException.class, () -> new KafkaConsumer<>(
+            props, new StringDeserializer(), new StringDeserializer()));
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testOnAssignment() {
+        OldPartitionAssignor oldAssignor = new OldPartitionAssignor();
+        ConsumerPartitionAssignor adaptedAssignor = new 
PartitionAssignorAdapter(oldAssignor);
+
+        TopicPartition tp1 = new TopicPartition("tp1", 1);
+        TopicPartition tp2 = new TopicPartition("tp2", 2);
+        List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
+
+        adaptedAssignor.onAssignment(new Assignment(partitions), new 
ConsumerGroupMetadata("", 1, "", Optional.empty()));
+
+        assertEquals(oldAssignor.partitions, partitions);
+    }
+
+    @Test
+    public void testAssign() {
+        ConsumerPartitionAssignor adaptedAssignor = new 
PartitionAssignorAdapter(new OldPartitionAssignor());
+
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("C1", new Subscription(Arrays.asList("topic1")));
+        subscriptions.put("C2", new Subscription(Arrays.asList("topic1", 
"topic2")));
+        subscriptions.put("C3", new Subscription(Arrays.asList("topic2", 
"topic3")));
+        GroupSubscription groupSubscription = new 
GroupSubscription(subscriptions);
+
+        Map<String, Assignment> assignments = adaptedAssignor.assign(null, 
groupSubscription).groupAssignment();
+
+        assertEquals(assignments.get("C1").partitions(), Arrays.asList(new 
TopicPartition("topic1", 1)));
+        assertEquals(assignments.get("C2").partitions(), Arrays.asList(new 
TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
+        assertEquals(assignments.get("C3").partitions(), Arrays.asList(new 
TopicPartition("topic2", 1), new TopicPartition("topic3", 1)));
+    }
+
+    /*
+     * Dummy assignor just gives each consumer partition 1 of each topic it's 
subscribed to
+     */
+    @SuppressWarnings("deprecation")
+    public static class OldPartitionAssignor implements PartitionAssignor {
+
+        List<TopicPartition> partitions = null;
+
+        @Override
+        public Subscription subscription(Set<String> topics) {
+            return new Subscription(new ArrayList<>(topics), null);
+        }
+
+        @Override
+        public Map<String, Assignment> assign(Cluster metadata, Map<String, 
Subscription> subscriptions) {
+            Map<String, Assignment> assignments = new HashMap<>();
+            for (Map.Entry<String, Subscription> entry : 
subscriptions.entrySet()) {
+                List<TopicPartition> partitions = new ArrayList<>();
+                for (String topic : entry.getValue().topics()) {
+                    partitions.add(new TopicPartition(topic, 1));
+                }
+                assignments.put(entry.getKey(), new Assignment(partitions, 
null));
+            }
+            return assignments;
+        }
+
+        @Override
+        public void onAssignment(Assignment assignment) {
+            partitions = assignment.partitions();
+        }
+
+        @Override
+        public String name() {
+            return "old-assignor";
+        }
+    }
+
+}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 9d0e738..0ae5904 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -29,6 +29,8 @@
         avoid potential misuse. The constructor 
<code>TopicAuthorizationException(String)</code> which was previously used for 
a single
         unauthorized topic was changed similarly.
     </li>
+    <li>The internal <code>PartitionAssignor</code> interface has been 
deprecated and replaced with a new <code>ConsumerPartitionAssignor</code> in 
the public API. Users
+        implementing a custom PartitionAssignor should migrate to the new 
interface as soon as possible.</li>
 </ul>
 
 <h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x 
to 2.3.0</a></h4>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index fa5f511..36196a6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -373,8 +373,8 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
      * 3. within each client, tasks are assigned to consumer clients in 
round-robin manner.
      */
     @Override
-    public GroupAssignment assign(final Cluster metadata, final 
GroupSubscription groupSubscriptions) {
-        final Map<String, Subscription> subscriptions = 
groupSubscriptions.groupSubscription();
+    public GroupAssignment assign(final Cluster metadata, final 
GroupSubscription groupSubscription) {
+        final Map<String, Subscription> subscriptions = 
groupSubscription.groupSubscription();
         // construct the client metadata from the decoded subscription info
         final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
         final Set<String> futureConsumers = new HashSet<>();

Reply via email to