This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a97e55b  KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing 
ordering when converting to Scala
a97e55b is described below

commit a97e55b83868ff786e740db55e73116f85456dcb
Author: Bob Barrett <bob.barr...@outlook.com>
AuthorDate: Thu May 9 11:08:22 2019 -0700

    KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when 
converting to Scala
    
    Because of how conversions between Java collections and Scala collections 
work, ImplicitLinkedHashMultiSet objects were being treated as unordered in 
some contexts where they shouldn't be.  This broke JOIN_GROUP handling.
    
    This patch renames ImplicitLinkedHashMultiSet to 
ImplicitLinkedHashMultCollection.  The order of Collection objects will be 
preserved when converting to scala.  Adding Set and List "views" to the 
Collection gives us a more elegant way of accessing that functionality when 
needed.
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   8 +-
 .../consumer/internals/AbstractCoordinator.java    |   2 +-
 .../consumer/internals/ConsumerCoordinator.java    |   4 +-
 .../requests/ControlledShutdownResponse.java       |   3 +-
 ...hSet.java => ImplicitLinkedHashCollection.java} | 218 ++++++++-
 ...java => ImplicitLinkedHashMultiCollection.java} |  12 +-
 .../internals/AbstractCoordinatorTest.java         |   4 +-
 .../apache/kafka/common/message/MessageTest.java   |   7 +-
 .../kafka/common/requests/RequestResponseTest.java |   9 +-
 .../utils/ImplicitLinkedHashCollectionTest.java    | 536 +++++++++++++++++++++
 ... => ImplicitLinkedHashMultiCollectionTest.java} |  16 +-
 .../common/utils/ImplicitLinkedHashSetTest.java    | 245 ----------
 .../runtime/distributed/WorkerCoordinator.java     |   4 +-
 .../runtime/distributed/WorkerCoordinatorTest.java |   2 +-
 .../src/main/scala/kafka/server/FetchSession.scala |  10 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  14 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  11 +-
 .../server/AbstractCreateTopicsRequestTest.scala   |   6 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  37 ++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   8 +-
 .../apache/kafka/message/MessageDataGenerator.java |  34 +-
 .../org/apache/kafka/message/MessageGenerator.java |   4 +-
 22 files changed, 853 insertions(+), 341 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a0958e9..8870519 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -61,7 +61,7 @@ import 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
-import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
@@ -71,7 +71,7 @@ import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -1280,7 +1280,7 @@ public class KafkaAdminClient extends AdminClient {
     public CreateTopicsResult createTopics(final Collection<NewTopic> 
newTopics,
                                            final CreateTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new 
HashMap<>(newTopics.size());
-        final CreatableTopicSet topics = new CreatableTopicSet();
+        final CreatableTopicCollection topics = new CreatableTopicCollection();
         for (NewTopic newTopic : newTopics) {
             if (topicNameIsUnrepresentable(newTopic.name())) {
                 KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
@@ -2025,7 +2025,7 @@ public class KafkaAdminClient extends AdminClient {
         IncrementalAlterConfigsRequestData requestData = new 
IncrementalAlterConfigsRequestData();
         requestData.setValidateOnly(validateOnly);
         for (ConfigResource resource : resources) {
-            AlterableConfigSet alterableConfigSet = new AlterableConfigSet();
+            AlterableConfigCollection alterableConfigSet = new 
AlterableConfigCollection();
             for (AlterConfigOp configEntry : configs.get(resource))
                 alterableConfigSet.add(new AlterableConfig().
                         setName(configEntry.configEntry().name()).
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 69d4928..ebe8231 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -188,7 +188,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
      * on the preference).
      * @return Non-empty map of supported protocols and metadata
      */
-    protected abstract JoinGroupRequestData.JoinGroupRequestProtocolSet 
metadata();
+    protected abstract JoinGroupRequestData.JoinGroupRequestProtocolCollection 
metadata();
 
     /**
      * Invoked prior to each group join or rejoin. This is typically used to 
perform any
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 829d9dc..95cff78 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -171,10 +171,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     }
 
     @Override
-    protected JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
+    protected JoinGroupRequestData.JoinGroupRequestProtocolCollection 
metadata() {
         log.debug("Joining group with current subscription: {}", 
subscriptions.subscription());
         this.joinedSubscription = subscriptions.subscription();
-        JoinGroupRequestData.JoinGroupRequestProtocolSet protocolSet = new 
JoinGroupRequestData.JoinGroupRequestProtocolSet();
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
 
         for (PartitionAssignor assignor : assignors) {
             Subscription subscription = 
assignor.subscription(joinedSubscription);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 5448f03..a49b84a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
-import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionSet;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -74,7 +73,7 @@ public class ControlledShutdownResponse extends 
AbstractResponse {
     public static ControlledShutdownResponse prepareResponse(Errors error, 
Set<TopicPartition> tps) {
         ControlledShutdownResponseData data = new 
ControlledShutdownResponseData();
         data.setErrorCode(error.code());
-        ControlledShutdownResponseData.RemainingPartitionSet pSet = new 
RemainingPartitionSet();
+        ControlledShutdownResponseData.RemainingPartitionCollection pSet = new 
ControlledShutdownResponseData.RemainingPartitionCollection();
         tps.forEach(tp -> {
             pSet.add(new RemainingPartition()
                     .setTopicName(tp.topic())
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
similarity index 66%
rename from 
clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java
rename to 
clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
index 75fc9ee..e060629 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
@@ -17,9 +17,14 @@
 
 package org.apache.kafka.common.utils;
 
+import java.util.AbstractCollection;
+import java.util.AbstractSequentialList;
 import java.util.AbstractSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
  * A memory-efficient hash set which tracks the order of insertion of elements.
@@ -40,7 +45,7 @@ import java.util.NoSuchElementException;
  *
  * This set does not allow null elements.  It does not have internal 
synchronization.
  */
-public class ImplicitLinkedHashSet<E extends ImplicitLinkedHashSet.Element> 
extends AbstractSet<E> {
+public class ImplicitLinkedHashCollection<E extends 
ImplicitLinkedHashCollection.Element> extends AbstractCollection<E> {
     public interface Element {
         int prev();
         void setPrev(int prev);
@@ -127,35 +132,137 @@ public class ImplicitLinkedHashSet<E extends 
ImplicitLinkedHashSet.Element> exte
         element.setPrev(INVALID_INDEX);
     }
 
-    private class ImplicitLinkedHashSetIterator implements Iterator<E> {
+    private class ImplicitLinkedHashCollectionIterator implements 
ListIterator<E> {
+        private int cursor = 0;
         private Element cur = head;
+        private int lastReturnedSlot = INVALID_INDEX;
 
-        private Element next = indexToElement(head, elements, head.next());
+        ImplicitLinkedHashCollectionIterator(int index) {
+            for (int i = 0; i < index; ++i) {
+                cur = indexToElement(head, elements, cur.next());
+                cursor++;
+            }
+        }
 
         @Override
         public boolean hasNext() {
-            return next != head;
+            return cursor != size;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+            return cursor != 0;
         }
 
         @Override
         public E next() {
-            if (next == head) {
+            if (cursor == size) {
+                throw new NoSuchElementException();
+            }
+            lastReturnedSlot = cur.next();
+            cur = indexToElement(head, elements, cur.next());
+            ++cursor;
+            @SuppressWarnings("unchecked")
+            E returnValue = (E) cur;
+            return returnValue;
+        }
+
+        @Override
+        public E previous() {
+            if (cursor == 0) {
                 throw new NoSuchElementException();
             }
-            cur = next;
-            next = indexToElement(head, elements, cur.next());
             @SuppressWarnings("unchecked")
             E returnValue = (E) cur;
+            cur = indexToElement(head, elements, cur.prev());
+            lastReturnedSlot = cur.next();
+            --cursor;
             return returnValue;
         }
 
         @Override
+        public int nextIndex() {
+            return cursor;
+        }
+
+        @Override
+        public int previousIndex() {
+            return cursor - 1;
+        }
+
+        @Override
         public void remove() {
-            if (cur == head) {
+            if (lastReturnedSlot == INVALID_INDEX) {
                 throw new IllegalStateException();
             }
-            ImplicitLinkedHashSet.this.remove(cur);
-            cur = head;
+
+            if (cur == indexToElement(head, elements, lastReturnedSlot)) {
+                cursor--;
+                cur = indexToElement(head, elements, cur.prev());
+            }
+            
ImplicitLinkedHashCollection.this.removeElementAtSlot(lastReturnedSlot);
+
+            lastReturnedSlot = INVALID_INDEX;
+        }
+
+        @Override
+        public void set(E e) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void add(E e) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private class ImplicitLinkedHashCollectionListView extends 
AbstractSequentialList<E> {
+
+        @Override
+        public ListIterator<E> listIterator(int index) {
+            if (index < 0 || index > size) {
+                throw new IndexOutOfBoundsException();
+            }
+
+            return ImplicitLinkedHashCollection.this.listIterator(index);
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+    }
+
+    private class ImplicitLinkedHashCollectionSetView extends AbstractSet<E> {
+
+        @Override
+        public Iterator<E> iterator() {
+            return ImplicitLinkedHashCollection.this.iterator();
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+
+        @Override
+        public boolean add(E newElement) {
+            return ImplicitLinkedHashCollection.this.add(newElement);
+        }
+
+        @Override
+        public boolean remove(Object key) {
+            return ImplicitLinkedHashCollection.this.remove(key);
+        }
+
+        @Override
+        public boolean contains(Object key) {
+            return ImplicitLinkedHashCollection.this.contains(key);
+        }
+
+        @Override
+        public void clear() {
+            ImplicitLinkedHashCollection.this.clear();
         }
     }
 
@@ -174,7 +281,11 @@ public class ImplicitLinkedHashSet<E extends 
ImplicitLinkedHashSet.Element> exte
      */
     @Override
     final public Iterator<E> iterator() {
-        return new ImplicitLinkedHashSetIterator();
+        return listIterator(0);
+    }
+
+    private ListIterator<E> listIterator(int index) {
+        return new ImplicitLinkedHashCollectionIterator(index);
     }
 
     final int slot(Element[] curElements, Object e) {
@@ -402,30 +513,30 @@ public class ImplicitLinkedHashSet<E extends 
ImplicitLinkedHashSet.Element> exte
     }
 
     /**
-     * Create a new ImplicitLinkedHashSet.
+     * Create a new ImplicitLinkedHashCollection.
      */
-    public ImplicitLinkedHashSet() {
+    public ImplicitLinkedHashCollection() {
         this(0);
     }
 
     /**
-     * Create a new ImplicitLinkedHashSet.
+     * Create a new ImplicitLinkedHashCollection.
      *
      * @param expectedNumElements   The number of elements we expect to have 
in this set.
      *                              This is used to optimize by setting the 
capacity ahead
      *                              of time rather than growing incrementally.
      */
-    public ImplicitLinkedHashSet(int expectedNumElements) {
+    public ImplicitLinkedHashCollection(int expectedNumElements) {
         clear(expectedNumElements);
     }
 
     /**
-     * Create a new ImplicitLinkedHashSet.
+     * Create a new ImplicitLinkedHashCollection.
      *
      * @param iter                  We will add all the elements accessible 
through this iterator
      *                              to the set.
      */
-    public ImplicitLinkedHashSet(Iterator<E> iter) {
+    public ImplicitLinkedHashCollection(Iterator<E> iter) {
         clear(0);
         while (iter.hasNext()) {
             mustAdd(iter.next());
@@ -457,8 +568,81 @@ public class ImplicitLinkedHashSet<E extends 
ImplicitLinkedHashSet.Element> exte
         }
     }
 
+    /**
+     * Compares the specified object with this collection for equality. Two
+     * {@code ImplicitLinkedHashCollection} objects are equal if they contain 
the
+     * same elements (as determined by the element's {@code equals} method), 
and
+     * those elements were inserted in the same order. Because
+     * {@code ImplicitLinkedHashCollectionListIterator} iterates over the 
elements
+     * in insertion order, it is sufficient to call {@code valuesList.equals}.
+     *
+     * Note that {@link ImplicitLinkedHashMultiCollection} does not override
+     * {@code equals} and uses this method as well. This means that two
+     * {@code ImplicitLinkedHashMultiCollection} objects will be considered 
equal even
+     * if they each contain two elements A and B such that A.equals(B) but A 
!= B and
+     * A and B have switched insertion positions between the two collections. 
This
+     * is an acceptable definition of equality, because the collections are 
still
+     * equal in terms of the order and value of each element.
+     *
+     * @param o object to be compared for equality with this collection
+     * @return true is the specified object is equal to this collection
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (!(o instanceof ImplicitLinkedHashCollection))
+            return false;
+
+        ImplicitLinkedHashCollection<?> ilhs = 
(ImplicitLinkedHashCollection<?>) o;
+        return this.valuesList().equals(ilhs.valuesList());
+    }
+
+    /**
+     * Returns the hash code value for this collection. Because
+     * {@code ImplicitLinkedHashCollection.equals} compares the {@code 
valuesList}
+     * of two {@code ImplicitLinkedHashCollection} objects to determine 
equality,
+     * this method uses the @{code valuesList} to compute the has code value 
as well.
+     *
+     * @return the hash code value for this collection
+     */
+    @Override
+    public int hashCode() {
+        return this.valuesList().hashCode();
+    }
+
     // Visible for testing
     final int numSlots() {
         return elements.length;
     }
+
+    /**
+     * Returns a {@link List} view of the elements contained in the collection,
+     * ordered by order of insertion into the collection. The list is backed 
by the
+     * collection, so changes to the collection are reflected in the list and
+     * vice-versa. The list supports element removal, which removes the 
corresponding
+     * element from the collection, but does not support the {@code add} or
+     * {@code set} operations.
+     *
+     * The list is implemented as a circular linked list, so all index-based
+     * operations, such as {@code List.get}, run in O(n) time.
+     *
+     * @return a list view of the elements contained in this collection
+     */
+    public List<E> valuesList() {
+        return new ImplicitLinkedHashCollectionListView();
+    }
+
+    /**
+     * Returns a {@link Set} view of the elements contained in the collection. 
The
+     * set is backed by the collection, so changes to the collection are 
reflected in
+     * the set, and vice versa. The set supports element removal and addition, 
which
+     * removes from or adds to the collection, respectively.
+     *
+     * @return a set view of the elements contained in this collection
+     */
+    public Set<E> valuesSet() {
+        return new ImplicitLinkedHashCollectionSetView();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSet.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java
similarity index 91%
rename from 
clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSet.java
rename to 
clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java
index 2eb53f6..85714d6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSet.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 /**
  * A memory-efficient hash multiset which tracks the order of insertion of 
elements.
- * See org.apache.kafka.common.utils.ImplicitLinkedHashSet for implementation 
details.
+ * See org.apache.kafka.common.utils.ImplicitLinkedHashCollection for 
implementation details.
  *
  * This class is a multi-set because it allows multiple elements to be 
inserted that are
  * equal to each other.
@@ -42,17 +42,17 @@ import java.util.List;
  *
  * This multiset does not allow null elements.  It does not have internal 
synchronization.
  */
-public class ImplicitLinkedHashMultiSet<E extends 
ImplicitLinkedHashSet.Element>
-        extends ImplicitLinkedHashSet<E> {
-    public ImplicitLinkedHashMultiSet() {
+public class ImplicitLinkedHashMultiCollection<E extends 
ImplicitLinkedHashCollection.Element>
+        extends ImplicitLinkedHashCollection<E> {
+    public ImplicitLinkedHashMultiCollection() {
         super(0);
     }
 
-    public ImplicitLinkedHashMultiSet(int expectedNumElements) {
+    public ImplicitLinkedHashMultiCollection(int expectedNumElements) {
         super(expectedNumElements);
     }
 
-    public ImplicitLinkedHashMultiSet(Iterator<E> iter) {
+    public ImplicitLinkedHashMultiCollection(Iterator<E> iter) {
         super(iter);
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 4ce0386..8d8afd2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -818,8 +818,8 @@ public class AbstractCoordinatorTest {
         }
 
         @Override
-        protected JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
-            return new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+        protected JoinGroupRequestData.JoinGroupRequestProtocolCollection 
metadata() {
+            return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
                     Collections.singleton(new 
JoinGroupRequestData.JoinGroupRequestProtocol()
                             .setName("dummy-subprotocol")
                             .setMetadata(EMPTY_DATA.array())).iterator()
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java 
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index fdb538e..1780a71 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.common.message;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Message;
@@ -36,7 +35,7 @@ import java.util.List;
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
-import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicSet;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -71,12 +70,12 @@ public final class MessageTest {
             setTransactionalId("blah").
             setProducerId(0xbadcafebadcafeL).
             setProducerEpoch((short) 30000).
-            setTopics(new AddPartitionsToTxnTopicSet(Collections.singletonList(
+            setTopics(new 
AddPartitionsToTxnTopicCollection(Collections.singletonList(
                 new AddPartitionsToTxnTopic().
                     setName("Topic").
                     setPartitions(Collections.singletonList(1))).iterator())));
         testMessageRoundTrips(new CreateTopicsRequestData().
-            setTimeoutMs(1000).setTopics(new CreatableTopicSet()));
+            setTimeoutMs(1000).setTopics(new 
CreateTopicsRequestData.CreatableTopicCollection()));
         testMessageRoundTrips(new DescribeAclsRequestData().
             setResourceType((byte) 42).
             setResourceNameFilter(null).
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ce7d1d3..43057a6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -35,7 +35,7 @@ import 
org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
-import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionSet;
+import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
@@ -67,7 +67,6 @@ import 
org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
@@ -781,7 +780,7 @@ public class RequestResponseTest {
     }
 
     private JoinGroupRequest createJoinGroupRequest(int version) {
-        JoinGroupRequestData.JoinGroupRequestProtocolSet protocols = new 
JoinGroupRequestData.JoinGroupRequestProtocolSet(
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
                 Collections.singleton(
                 new JoinGroupRequestData.JoinGroupRequestProtocol()
                         .setName("consumer-range")
@@ -1053,7 +1052,7 @@ public class RequestResponseTest {
         RemainingPartition p2 = new RemainingPartition()
                 .setTopicName("test1")
                 .setPartitionIndex(10);
-        RemainingPartitionSet pSet = new RemainingPartitionSet();
+        RemainingPartitionCollection pSet = new RemainingPartitionCollection();
         pSet.add(p1);
         pSet.add(p2);
         ControlledShutdownResponseData data = new 
ControlledShutdownResponseData()
@@ -1522,7 +1521,7 @@ public class RequestResponseTest {
                 .setName("retention.ms")
                 .setConfigOperation((byte) 0)
                 .setValue("100");
-        AlterableConfigSet alterableConfigs = new AlterableConfigSet();
+        IncrementalAlterConfigsRequestData.AlterableConfigCollection 
alterableConfigs = new 
IncrementalAlterConfigsRequestData.AlterableConfigCollection();
         alterableConfigs.add(alterableConfig);
 
         data.resources().add(new AlterConfigsResource()
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
new file mode 100644
index 0000000..8c102dd
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * A unit test for ImplicitLinkedHashCollection.
+ */
+public class ImplicitLinkedHashCollectionTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    final static class TestElement implements 
ImplicitLinkedHashCollection.Element {
+        private int prev = ImplicitLinkedHashCollection.INVALID_INDEX;
+        private int next = ImplicitLinkedHashCollection.INVALID_INDEX;
+        private final int val;
+
+        TestElement(int val) {
+            this.val = val;
+        }
+
+        @Override
+        public int prev() {
+            return prev;
+        }
+
+        @Override
+        public void setPrev(int prev) {
+            this.prev = prev;
+        }
+
+        @Override
+        public int next() {
+            return next;
+        }
+
+        @Override
+        public void setNext(int next) {
+            this.next = next;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if ((o == null) || (o.getClass() != TestElement.class)) return 
false;
+            TestElement that = (TestElement) o;
+            return val == that.val;
+        }
+
+        @Override
+        public String toString() {
+            return "TestElement(" + val + ")";
+        }
+
+        @Override
+        public int hashCode() {
+            return val;
+        }
+    }
+
+    @Test
+    public void testNullForbidden() {
+        ImplicitLinkedHashMultiCollection<TestElement> multiColl = new 
ImplicitLinkedHashMultiCollection<>();
+        assertFalse(multiColl.add(null));
+    }
+
+    @Test
+    public void testInsertDelete() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>(100);
+        assertTrue(coll.add(new TestElement(1)));
+        TestElement second = new TestElement(2);
+        assertTrue(coll.add(second));
+        assertTrue(coll.add(new TestElement(3)));
+        assertFalse(coll.add(new TestElement(3)));
+        assertEquals(3, coll.size());
+        assertTrue(coll.contains(new TestElement(1)));
+        assertFalse(coll.contains(new TestElement(4)));
+        TestElement secondAgain = coll.find(new TestElement(2));
+        assertTrue(second == secondAgain);
+        assertTrue(coll.remove(new TestElement(1)));
+        assertFalse(coll.remove(new TestElement(1)));
+        assertEquals(2, coll.size());
+        coll.clear();
+        assertEquals(0, coll.size());
+    }
+
+    static void expectTraversal(Iterator<TestElement> iterator, Integer... 
sequence) {
+        int i = 0;
+        while (iterator.hasNext()) {
+            TestElement element = iterator.next();
+            Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but 
only " +
+                    sequence.length + " were expected.", i < sequence.length);
+            Assert.assertEquals("Iterator value number " + (i + 1) + " was 
incorrect.",
+                    sequence[i].intValue(), element.val);
+            i = i + 1;
+        }
+        Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but " +
+                sequence.length + " were expected.", i == sequence.length);
+    }
+
+    static void expectTraversal(Iterator<TestElement> iter, Iterator<Integer> 
expectedIter) {
+        int i = 0;
+        while (iter.hasNext()) {
+            TestElement element = iter.next();
+            Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but 
only " +
+                    i + " were expected.", expectedIter.hasNext());
+            Integer expected = expectedIter.next();
+            Assert.assertEquals("Iterator value number " + (i + 1) + " was 
incorrect.",
+                    expected.intValue(), element.val);
+            i = i + 1;
+        }
+        Assert.assertFalse("Iterator yieled " + i + " elements, but at least " 
+
+                (i + 1) + " were expected.", expectedIter.hasNext());
+    }
+
+    @Test
+    public void testTraversal() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        expectTraversal(coll.iterator());
+        assertTrue(coll.add(new TestElement(2)));
+        expectTraversal(coll.iterator(), 2);
+        assertTrue(coll.add(new TestElement(1)));
+        expectTraversal(coll.iterator(), 2, 1);
+        assertTrue(coll.add(new TestElement(100)));
+        expectTraversal(coll.iterator(), 2, 1, 100);
+        assertTrue(coll.remove(new TestElement(1)));
+        expectTraversal(coll.iterator(), 2, 100);
+        assertTrue(coll.add(new TestElement(1)));
+        expectTraversal(coll.iterator(), 2, 100, 1);
+        Iterator<TestElement> iter = coll.iterator();
+        iter.next();
+        iter.next();
+        iter.remove();
+        iter.next();
+        assertFalse(iter.hasNext());
+        expectTraversal(coll.iterator(), 2, 1);
+        List<TestElement> list = new ArrayList<>();
+        list.add(new TestElement(1));
+        list.add(new TestElement(2));
+        assertTrue(coll.removeAll(list));
+        assertFalse(coll.removeAll(list));
+        expectTraversal(coll.iterator());
+        assertEquals(0, coll.size());
+        assertTrue(coll.isEmpty());
+    }
+
+    @Test
+    public void testSetViewGet() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        Set<TestElement> set = coll.valuesSet();
+        assertTrue(set.contains(new TestElement(1)));
+        assertTrue(set.contains(new TestElement(2)));
+        assertTrue(set.contains(new TestElement(3)));
+        assertEquals(3, set.size());
+    }
+
+    @Test
+    public void testSetViewModification() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        // Removal from set is reflected in collection
+        Set<TestElement> set = coll.valuesSet();
+        set.remove(new TestElement(1));
+        assertFalse(coll.contains(new TestElement(1)));
+        assertEquals(2, coll.size());
+
+        // Addition to set is reflected in collection
+        set.add(new TestElement(4));
+        assertTrue(coll.contains(new TestElement(4)));
+        assertEquals(3, coll.size());
+
+        // Removal from collection is reflected in set
+        coll.remove(new TestElement(2));
+        assertFalse(set.contains(new TestElement(2)));
+        assertEquals(2, set.size());
+
+        // Addition to collection is reflected in set
+        coll.add(new TestElement(5));
+        assertTrue(set.contains(new TestElement(5)));
+        assertEquals(3, set.size());
+
+        // Ordering in the collection is maintained
+        int val = 3;
+        for (TestElement e : coll) {
+            assertEquals(val, e.val);
+            ++val;
+        }
+    }
+
+    @Test
+    public void testListViewGet() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        List<TestElement> list = coll.valuesList();
+        assertEquals(1, list.get(0).val);
+        assertEquals(2, list.get(1).val);
+        assertEquals(3, list.get(2).val);
+        assertEquals(3, list.size());
+    }
+
+    @Test
+    public void testListViewModification() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        // Removal from list is reflected in collection
+        List<TestElement> list = coll.valuesList();
+        list.remove(1);
+        assertTrue(coll.contains(new TestElement(1)));
+        assertFalse(coll.contains(new TestElement(2)));
+        assertTrue(coll.contains(new TestElement(3)));
+        assertEquals(2, coll.size());
+
+        // Removal from collection is reflected in list
+        coll.remove(new TestElement(1));
+        assertEquals(3, list.get(0).val);
+        assertEquals(1, list.size());
+
+        // Addition to collection is reflected in list
+        coll.add(new TestElement(4));
+        assertEquals(3, list.get(0).val);
+        assertEquals(4, list.get(1).val);
+        assertEquals(2, list.size());
+    }
+
+    @Test
+    public void testEmptyListIterator() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        ListIterator iter = coll.valuesList().listIterator();
+        assertFalse(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+    }
+
+    @Test
+    public void testListIteratorCreation() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        // Iterator created at the start of the list should have a next but no 
prev
+        ListIterator<TestElement> iter = coll.valuesList().listIterator();
+        assertTrue(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+
+        // Iterator created in the middle of the list should have both a next 
and a prev
+        iter = coll.valuesList().listIterator(2);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        // Iterator created at the end of the list should have a prev but no 
next
+        iter = coll.valuesList().listIterator(3);
+        assertFalse(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(3, iter.nextIndex());
+        assertEquals(2, iter.previousIndex());
+    }
+
+    @Test
+    public void testListIteratorTraversal() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+        ListIterator<TestElement> iter = coll.valuesList().listIterator();
+
+        // Step the iterator forward to the end of the list
+        assertTrue(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+
+        assertEquals(1, iter.next().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        assertEquals(2, iter.next().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        assertEquals(3, iter.next().val);
+        assertFalse(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(3, iter.nextIndex());
+        assertEquals(2, iter.previousIndex());
+
+        // Step back to the middle of the list
+        assertEquals(3, iter.previous().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        assertEquals(2, iter.previous().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        // Step forward one and then back one, return value should remain the 
same
+        assertEquals(2, iter.next().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        assertEquals(2, iter.previous().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        // Step back to the front of the list
+        assertEquals(1, iter.previous().val);
+        assertTrue(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+    }
+
+    @Test
+    public void testListIteratorRemove() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+        coll.add(new TestElement(4));
+        coll.add(new TestElement(5));
+
+        ListIterator<TestElement> iter = coll.valuesList().listIterator();
+        try {
+            iter.remove();
+            fail("Calling remove() without calling next() or previous() should 
raise an exception");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+
+        // Remove after next()
+        iter.next();
+        iter.next();
+        iter.next();
+        iter.remove();
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        try {
+            iter.remove();
+            fail("Calling remove() twice without calling next() or previous() 
in between should raise an exception");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+
+        // Remove after previous()
+        assertEquals(2, iter.previous().val);
+        iter.remove();
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        // Remove the first element of the list
+        assertEquals(1, iter.previous().val);
+        iter.remove();
+        assertTrue(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+
+        // Remove the last element of the list
+        assertEquals(4, iter.next().val);
+        assertEquals(5, iter.next().val);
+        iter.remove();
+        assertFalse(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        // Remove the final remaining element of the list
+        assertEquals(4, iter.previous().val);
+        iter.remove();
+        assertFalse(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+
+    }
+
+    @Test
+    public void testCollisions() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>(5);
+        assertEquals(11, coll.numSlots());
+        assertTrue(coll.add(new TestElement(11)));
+        assertTrue(coll.add(new TestElement(0)));
+        assertTrue(coll.add(new TestElement(22)));
+        assertTrue(coll.add(new TestElement(33)));
+        assertEquals(11, coll.numSlots());
+        expectTraversal(coll.iterator(), 11, 0, 22, 33);
+        assertTrue(coll.remove(new TestElement(22)));
+        expectTraversal(coll.iterator(), 11, 0, 33);
+        assertEquals(3, coll.size());
+        assertFalse(coll.isEmpty());
+    }
+
+    @Test
+    public void testEnlargement() {
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>(5);
+        assertEquals(11, coll.numSlots());
+        for (int i = 0; i < 6; i++) {
+            assertTrue(coll.add(new TestElement(i)));
+        }
+        assertEquals(23, coll.numSlots());
+        assertEquals(6, coll.size());
+        expectTraversal(coll.iterator(), 0, 1, 2, 3, 4, 5);
+        for (int i = 0; i < 6; i++) {
+            assertTrue("Failed to find element " + i, coll.contains(new 
TestElement(i)));
+        }
+        coll.remove(new TestElement(3));
+        assertEquals(23, coll.numSlots());
+        assertEquals(5, coll.size());
+        expectTraversal(coll.iterator(), 0, 1, 2, 4, 5);
+    }
+
+    @Test
+    public void testManyInsertsAndDeletes() {
+        Random random = new Random(123);
+        LinkedHashSet<Integer> existing = new LinkedHashSet<>();
+        ImplicitLinkedHashCollection<TestElement> coll = new 
ImplicitLinkedHashCollection<>();
+        for (int i = 0; i < 100; i++) {
+            addRandomElement(random, existing, coll);
+            addRandomElement(random, existing, coll);
+            addRandomElement(random, existing, coll);
+            removeRandomElement(random, existing, coll);
+            expectTraversal(coll.iterator(), existing.iterator());
+        }
+    }
+
+    @Test
+    public void testEquals() {
+        ImplicitLinkedHashCollection<TestElement> coll1 = new 
ImplicitLinkedHashCollection<>();
+        coll1.add(new TestElement(1));
+        coll1.add(new TestElement(2));
+        coll1.add(new TestElement(3));
+
+        ImplicitLinkedHashCollection<TestElement> coll2 = new 
ImplicitLinkedHashCollection<>();
+        coll2.add(new TestElement(1));
+        coll2.add(new TestElement(2));
+        coll2.add(new TestElement(3));
+
+        ImplicitLinkedHashCollection<TestElement> coll3 = new 
ImplicitLinkedHashCollection<>();
+        coll3.add(new TestElement(1));
+        coll3.add(new TestElement(3));
+        coll3.add(new TestElement(2));
+
+        assertEquals(coll1, coll2);
+        assertNotEquals(coll1, coll3);
+        assertNotEquals(coll2, coll3);
+    }
+
+    private void addRandomElement(Random random, LinkedHashSet<Integer> 
existing,
+                                  ImplicitLinkedHashCollection<TestElement> 
set) {
+        int next;
+        do {
+            next = random.nextInt();
+        } while (existing.contains(next));
+        existing.add(next);
+        set.add(new TestElement(next));
+    }
+
+    private void removeRandomElement(Random random, Collection<Integer> 
existing,
+                                     ImplicitLinkedHashCollection<TestElement> 
coll) {
+        int removeIdx = random.nextInt(existing.size());
+        Iterator<Integer> iter = existing.iterator();
+        Integer element = null;
+        for (int i = 0; i <= removeIdx; i++) {
+            element = iter.next();
+        }
+        existing.remove(new TestElement(element));
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSetTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
similarity index 88%
rename from 
clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSetTest.java
rename to 
clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
index 950deb8..8d2b850 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSetTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
-import org.apache.kafka.common.utils.ImplicitLinkedHashSetTest.TestElement;
+import 
org.apache.kafka.common.utils.ImplicitLinkedHashCollectionTest.TestElement;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -32,21 +32,21 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * A unit test for ImplicitLinkedHashMultiSet.
+ * A unit test for ImplicitLinkedHashMultiCollection.
  */
-public class ImplicitLinkedHashMultiSetTest {
+public class ImplicitLinkedHashMultiCollectionTest {
     @Rule
     final public Timeout globalTimeout = Timeout.millis(120000);
 
     @Test
     public void testNullForbidden() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new 
ImplicitLinkedHashMultiSet<>();
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new 
ImplicitLinkedHashMultiCollection<>();
         assertFalse(multiSet.add(null));
     }
 
     @Test
     public void testInsertDelete() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new 
ImplicitLinkedHashMultiSet<>(100);
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new 
ImplicitLinkedHashMultiCollection<>(100);
         TestElement e1 = new TestElement(1);
         TestElement e2 = new TestElement(1);
         TestElement e3 = new TestElement(2);
@@ -64,7 +64,7 @@ public class ImplicitLinkedHashMultiSetTest {
 
     @Test
     public void testTraversal() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new 
ImplicitLinkedHashMultiSet<>();
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new 
ImplicitLinkedHashMultiCollection<>();
         expectExactTraversal(multiSet.iterator());
         TestElement e1 = new TestElement(1);
         TestElement e2 = new TestElement(1);
@@ -96,7 +96,7 @@ public class ImplicitLinkedHashMultiSetTest {
 
     @Test
     public void testEnlargement() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new 
ImplicitLinkedHashMultiSet<>(5);
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new 
ImplicitLinkedHashMultiCollection<>(5);
         assertEquals(11, multiSet.numSlots());
         TestElement[] testElements = {
             new TestElement(100),
@@ -126,7 +126,7 @@ public class ImplicitLinkedHashMultiSetTest {
     public void testManyInsertsAndDeletes() {
         Random random = new Random(123);
         LinkedList<TestElement> existing = new LinkedList<>();
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new 
ImplicitLinkedHashMultiSet<>();
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new 
ImplicitLinkedHashMultiCollection<>();
         for (int i = 0; i < 100; i++) {
             for (int j = 0; j < 4; j++) {
                 TestElement testElement = new TestElement(random.nextInt());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
deleted file mode 100644
index 156eba2..0000000
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
-
-/**
- * A unit test for ImplicitLinkedHashSet.
- */
-public class ImplicitLinkedHashSetTest {
-    @Rule
-    final public Timeout globalTimeout = Timeout.millis(120000);
-
-    final static class TestElement implements ImplicitLinkedHashSet.Element {
-        private int prev = ImplicitLinkedHashSet.INVALID_INDEX;
-        private int next = ImplicitLinkedHashSet.INVALID_INDEX;
-        private final int val;
-
-        TestElement(int val) {
-            this.val = val;
-        }
-
-        @Override
-        public int prev() {
-            return prev;
-        }
-
-        @Override
-        public void setPrev(int prev) {
-            this.prev = prev;
-        }
-
-        @Override
-        public int next() {
-            return next;
-        }
-
-        @Override
-        public void setNext(int next) {
-            this.next = next;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if ((o == null) || (o.getClass() != TestElement.class)) return 
false;
-            TestElement that = (TestElement) o;
-            return val == that.val;
-        }
-
-        @Override
-        public String toString() {
-            return "TestElement(" + val + ")";
-        }
-
-        @Override
-        public int hashCode() {
-            return val;
-        }
-    }
-
-    @Test
-    public void testNullForbidden() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new 
ImplicitLinkedHashMultiSet<>();
-        assertFalse(multiSet.add(null));
-    }
-
-    @Test
-    public void testInsertDelete() {
-        ImplicitLinkedHashSet<TestElement> set = new 
ImplicitLinkedHashSet<>(100);
-        assertTrue(set.add(new TestElement(1)));
-        TestElement second = new TestElement(2);
-        assertTrue(set.add(second));
-        assertTrue(set.add(new TestElement(3)));
-        assertFalse(set.add(new TestElement(3)));
-        assertEquals(3, set.size());
-        assertTrue(set.contains(new TestElement(1)));
-        assertFalse(set.contains(new TestElement(4)));
-        TestElement secondAgain = set.find(new TestElement(2));
-        assertTrue(second == secondAgain);
-        assertTrue(set.remove(new TestElement(1)));
-        assertFalse(set.remove(new TestElement(1)));
-        assertEquals(2, set.size());
-        set.clear();
-        assertEquals(0, set.size());
-    }
-
-    static void expectTraversal(Iterator<TestElement> iterator, Integer... 
sequence) {
-        int i = 0;
-        while (iterator.hasNext()) {
-            TestElement element = iterator.next();
-            Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but 
only " +
-                sequence.length + " were expected.", i < sequence.length);
-            Assert.assertEquals("Iterator value number " + (i + 1) + " was 
incorrect.",
-                sequence[i].intValue(), element.val);
-            i = i + 1;
-        }
-        Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but " +
-            sequence.length + " were expected.", i == sequence.length);
-    }
-
-    static void expectTraversal(Iterator<TestElement> iter, Iterator<Integer> 
expectedIter) {
-        int i = 0;
-        while (iter.hasNext()) {
-            TestElement element = iter.next();
-            Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but 
only " +
-                i + " were expected.", expectedIter.hasNext());
-            Integer expected = expectedIter.next();
-            Assert.assertEquals("Iterator value number " + (i + 1) + " was 
incorrect.",
-                expected.intValue(), element.val);
-            i = i + 1;
-        }
-        Assert.assertFalse("Iterator yieled " + i + " elements, but at least " 
+
-            (i + 1) + " were expected.", expectedIter.hasNext());
-    }
-
-    @Test
-    public void testTraversal() {
-        ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>();
-        expectTraversal(set.iterator());
-        assertTrue(set.add(new TestElement(2)));
-        expectTraversal(set.iterator(), 2);
-        assertTrue(set.add(new TestElement(1)));
-        expectTraversal(set.iterator(), 2, 1);
-        assertTrue(set.add(new TestElement(100)));
-        expectTraversal(set.iterator(), 2, 1, 100);
-        assertTrue(set.remove(new TestElement(1)));
-        expectTraversal(set.iterator(), 2, 100);
-        assertTrue(set.add(new TestElement(1)));
-        expectTraversal(set.iterator(), 2, 100, 1);
-        Iterator<TestElement> iter = set.iterator();
-        iter.next();
-        iter.next();
-        iter.remove();
-        iter.next();
-        assertFalse(iter.hasNext());
-        expectTraversal(set.iterator(), 2, 1);
-        List<TestElement> list = new ArrayList<>();
-        list.add(new TestElement(1));
-        list.add(new TestElement(2));
-        assertTrue(set.removeAll(list));
-        assertFalse(set.removeAll(list));
-        expectTraversal(set.iterator());
-        assertEquals(0, set.size());
-        assertTrue(set.isEmpty());
-    }
-
-    @Test
-    public void testCollisions() {
-        ImplicitLinkedHashSet<TestElement> set = new 
ImplicitLinkedHashSet<>(5);
-        assertEquals(11, set.numSlots());
-        assertTrue(set.add(new TestElement(11)));
-        assertTrue(set.add(new TestElement(0)));
-        assertTrue(set.add(new TestElement(22)));
-        assertTrue(set.add(new TestElement(33)));
-        assertEquals(11, set.numSlots());
-        expectTraversal(set.iterator(), 11, 0, 22, 33);
-        assertTrue(set.remove(new TestElement(22)));
-        expectTraversal(set.iterator(), 11, 0, 33);
-        assertEquals(3, set.size());
-        assertFalse(set.isEmpty());
-    }
-
-    @Test
-    public void testEnlargement() {
-        ImplicitLinkedHashSet<TestElement> set = new 
ImplicitLinkedHashSet<>(5);
-        assertEquals(11, set.numSlots());
-        for (int i = 0; i < 6; i++) {
-            assertTrue(set.add(new TestElement(i)));
-        }
-        assertEquals(23, set.numSlots());
-        assertEquals(6, set.size());
-        expectTraversal(set.iterator(), 0, 1, 2, 3, 4, 5);
-        for (int i = 0; i < 6; i++) {
-            assertTrue("Failed to find element " + i, set.contains(new 
TestElement(i)));
-        }
-        set.remove(new TestElement(3));
-        assertEquals(23, set.numSlots());
-        assertEquals(5, set.size());
-        expectTraversal(set.iterator(), 0, 1, 2, 4, 5);
-    }
-
-    @Test
-    public void testManyInsertsAndDeletes() {
-        Random random = new Random(123);
-        LinkedHashSet<Integer> existing = new LinkedHashSet<>();
-        ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>();
-        for (int i = 0; i < 100; i++) {
-            addRandomElement(random, existing, set);
-            addRandomElement(random, existing, set);
-            addRandomElement(random, existing, set);
-            removeRandomElement(random, existing, set);
-            expectTraversal(set.iterator(), existing.iterator());
-        }
-    }
-
-    private void addRandomElement(Random random, LinkedHashSet<Integer> 
existing,
-                                  ImplicitLinkedHashSet<TestElement> set) {
-        int next;
-        do {
-            next = random.nextInt();
-        } while (existing.contains(next));
-        existing.add(next);
-        set.add(new TestElement(next));
-    }
-
-    private void removeRandomElement(Random random, Collection<Integer> 
existing,
-                             ImplicitLinkedHashSet<TestElement> set) {
-        int removeIdx = random.nextInt(existing.size());
-        Iterator<Integer> iter = existing.iterator();
-        Integer element = null;
-        for (int i = 0; i <= removeIdx; i++) {
-            element = iter.next();
-        }
-        existing.remove(new TestElement(element));
-    }
-}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 968855a..34b2376 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -146,11 +146,11 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
     }
 
     @Override
-    public JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
+    public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
         configSnapshot = configStorage.snapshot();
         ConnectProtocol.WorkerState workerState = new 
ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
         ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
-        return new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+        return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
                 Collections.singleton(new 
JoinGroupRequestData.JoinGroupRequestProtocol()
                         .setName(DEFAULT_SUBPROTOCOL)
                         .setMetadata(metadata.array()))
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index cda5f61..3684025 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -190,7 +190,7 @@ public class WorkerCoordinatorTest {
 
         PowerMock.replayAll();
 
-        JoinGroupRequestData.JoinGroupRequestProtocolSet serialized = 
coordinator.metadata();
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection serialized = 
coordinator.metadata();
         assertEquals(1, serialized.size());
 
         Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> 
protocolIterator = serialized.iterator();
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala 
b/core/src/main/scala/kafka/server/FetchSession.scala
index 16ee872..fe97346 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, 
INITIAL_EPOCH, INVALID_SESSION_ID}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
FetchMetadata => JFetchMetadata}
-import org.apache.kafka.common.utils.{ImplicitLinkedHashSet, Time, Utils}
+import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time, 
Utils}
 
 import scala.math.Ordered.orderingToOrdered
 import scala.collection.{mutable, _}
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
 object FetchSession {
   type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]
   type RESP_MAP = util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
-  type CACHE_MAP = ImplicitLinkedHashSet[CachedPartition]
+  type CACHE_MAP = ImplicitLinkedHashCollection[CachedPartition]
   type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]]]
 
   val NUM_INCREMENTAL_FETCH_SESSISONS = "NumIncrementalFetchSessions"
@@ -79,10 +79,10 @@ class CachedPartition(val topic: String,
                       var highWatermark: Long,
                       var fetcherLogStartOffset: Long,
                       var localLogStartOffset: Long)
-    extends ImplicitLinkedHashSet.Element {
+    extends ImplicitLinkedHashCollection.Element {
 
-  var cachedNext: Int = ImplicitLinkedHashSet.INVALID_INDEX
-  var cachedPrev: Int = ImplicitLinkedHashSet.INVALID_INDEX
+  var cachedNext: Int = ImplicitLinkedHashCollection.INVALID_INDEX
+  var cachedPrev: Int = ImplicitLinkedHashCollection.INVALID_INDEX
 
   override def next = cachedNext
   override def setNext(next: Int) = this.cachedNext = next
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 34ed7d7..302718d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -47,9 +47,9 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import 
org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, 
CreatableTopicResultSet}
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, 
CreatableTopicResultCollection}
 import org.apache.kafka.common.message._
-import 
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, 
DeletableTopicResultSet}
+import 
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, 
DeletableTopicResultCollection}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -1355,7 +1355,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val requireKnownMemberId = joinGroupRequest.version >= 4 && 
groupInstanceId.isEmpty
 
       // let the coordinator handle join-group
-      val protocols = joinGroupRequest.data().protocols().asScala.map(protocol 
=>
+      val protocols = 
joinGroupRequest.data().protocols().valuesList.asScala.map(protocol =>
         (protocol.name, protocol.metadata)).toList
       groupCoordinator.handleJoinGroup(
         joinGroupRequest.data().groupId,
@@ -1496,7 +1496,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTopicsRequest(request: RequestChannel.Request) {
-    def sendResponseCallback(results: CreatableTopicResultSet): Unit = {
+    def sendResponseCallback(results: CreatableTopicResultCollection): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val responseData = new CreateTopicsResponseData().
           setThrottleTimeMs(requestThrottleMs).
@@ -1510,7 +1510,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val createTopicsRequest = request.body[CreateTopicsRequest]
-    val results = new 
CreatableTopicResultSet(createTopicsRequest.data().topics().size())
+    val results = new 
CreatableTopicResultCollection(createTopicsRequest.data().topics().size())
     if (!controller.isActive) {
       createTopicsRequest.data.topics.asScala.foreach { case topic =>
         results.add(new CreatableTopicResult().setName(topic.name()).
@@ -1594,7 +1594,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
-    def sendResponseCallback(results: DeletableTopicResultSet): Unit = {
+    def sendResponseCallback(results: DeletableTopicResultCollection): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val responseData = new DeleteTopicsResponseData()
           .setThrottleTimeMs(requestThrottleMs)
@@ -1607,7 +1607,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val deleteTopicRequest = request.body[DeleteTopicsRequest]
-    val results = new 
DeletableTopicResultSet(deleteTopicRequest.data.topicNames.size)
+    val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
     val toDelete = mutable.Set[String]()
     if (!controller.isActive) {
       deleteTopicRequest.data.topicNames.asScala.foreach { case topic =>
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ee1c02b..ac8153c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -34,8 +34,9 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.message._
-import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicSet}
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource,
 AlterableConfig, AlterableConfigSet}
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicCollection}
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource,
 AlterableConfig, AlterableConfigCollection}
+import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
Records, RecordBatch, SimpleRecord}
@@ -320,7 +321,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createJoinGroupRequest = {
-    val protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+    val protocolSet = new JoinGroupRequestProtocolCollection(
       Collections.singletonList(new 
JoinGroupRequestData.JoinGroupRequestProtocol()
         .setName("consumer-range")
         .setMetadata("test".getBytes())
@@ -395,7 +396,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createTopicsRequest =
     new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(
-      new CreatableTopicSet(Collections.singleton(new CreatableTopic().
+      new CreatableTopicCollection(Collections.singleton(new CreatableTopic().
         setName(createTopic).setNumPartitions(1).
           setReplicationFactor(1.toShort)).iterator))).build()
 
@@ -422,7 +423,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val alterableConfig = new AlterableConfig
     alterableConfig.setName(LogConfig.MaxMessageBytesProp).
       setValue("1000000").setConfigOperation(AlterConfigOp.OpType.SET.id())
-    val alterableConfigSet = new AlterableConfigSet
+    val alterableConfigSet = new AlterableConfigCollection
     alterableConfigSet.add(alterableConfig)
     data.resources().add(new AlterConfigsResource().
       
setResourceName(tp.topic).setResourceType(ConfigResource.Type.TOPIC.id()).
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index a54e5fc..514e7ae 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -41,7 +41,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest 
{
                 validateOnly: Boolean = false) = {
     val req = new CreateTopicsRequestData()
     req.setTimeoutMs(timeout)
-    req.setTopics(new CreatableTopicSet(topics.asJava.iterator()))
+    req.setTopics(new CreatableTopicCollection(topics.asJava.iterator()))
     req.setValidateOnly(validateOnly)
     new CreateTopicsRequest.Builder(req).build()
   }
@@ -68,7 +68,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest 
{
       topic.setReplicationFactor(1.toShort)
     }
     if (config != null) {
-      val effectiveConfigs = new CreateableTopicConfigSet()
+      val effectiveConfigs = new CreateableTopicConfigCollection()
       config.foreach {
         case (name, value) => {
           effectiveConfigs.add(new 
CreateableTopicConfig().setName(name).setValue(value))
@@ -77,7 +77,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest 
{
       topic.setConfigs(effectiveConfigs)
     }
     if (assignment != null) {
-      val effectiveAssignments = new CreatableReplicaAssignmentSet()
+      val effectiveAssignments = new CreatableReplicaAssignmentCollection()
       assignment.foreach {
         case (partitionIndex, brokerIdList) => {
           val effectiveAssignment = new CreatableReplicaAssignment()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 54316f3..b83dd92 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -48,6 +48,8 @@ import org.apache.kafka.common.requests.{FetchMetadata => 
JFetchMetadata, _}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
+import org.apache.kafka.common.message.JoinGroupRequestData
+import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.junit.Assert.{assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
@@ -508,6 +510,41 @@ class KafkaApisTest {
     assertNull(partitionData.abortedTransactions)
   }
 
+  @Test
+  def testJoinGroupProtocolsOrder: Unit = {
+    val protocols = List(
+      new 
JoinGroupRequestProtocol().setName("first").setMetadata("first".getBytes()),
+      new 
JoinGroupRequestProtocol().setName("second").setMetadata("second".getBytes())
+    )
+
+    EasyMock.expect(groupCoordinator.handleJoinGroup(
+      anyString,
+      anyString,
+      anyObject(classOf[Option[String]]),
+      anyBoolean,
+      anyString,
+      anyString,
+      anyInt,
+      anyInt,
+      anyString,
+      EasyMock.eq(protocols.map(protocol => (protocol.name, 
protocol.metadata))),
+      anyObject()
+    ))
+
+    createKafkaApis().handleJoinGroupRequest(
+      buildRequest(
+        new JoinGroupRequest.Builder(
+          new JoinGroupRequestData()
+            .setGroupId("test")
+            .setMemberId("test")
+            .setProtocolType("consumer")
+            .setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(protocols.iterator.asJava))
+        )
+      )._2)
+
+    EasyMock.replay(groupCoordinator)
+  }
+
   /**
    * Return pair of listener names in the metadataCache: PLAINTEXT and 
LISTENER2 respectively.
    */
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index b4c1268..52ac700 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -27,7 +27,9 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
-import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicSet}
+import org.apache.kafka.common.message.ControlledShutdownRequestData
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicCollection}
+import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
@@ -288,7 +290,7 @@ class RequestQuotaTest extends BaseRequestTest {
               .setGroupInstanceId(null)
               .setProtocolType("consumer")
               .setProtocols(
-                new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+                new JoinGroupRequestProtocolCollection(
                   Collections.singletonList(new 
JoinGroupRequestData.JoinGroupRequestProtocol()
                     .setName("consumer-range")
                     .setMetadata("test".getBytes())).iterator()
@@ -324,7 +326,7 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.CREATE_TOPICS => {
           new CreateTopicsRequest.Builder(
             new CreateTopicsRequestData().setTopics(
-              new CreatableTopicSet(Collections.singleton(
+              new CreatableTopicCollection(Collections.singleton(
                 new CreatableTopic().setName("topic-2").setNumPartitions(1).
                   setReplicationFactor(1.toShort)).iterator())))
         }
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index c8e70bb..d6cd5f3 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -120,8 +120,8 @@ public final class MessageDataGenerator {
             headerGenerator.addImport(MessageGenerator.MESSAGE_CLASS);
         }
         if (isSetElement) {
-            
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
-            implementedInterfaces.add("ImplicitLinkedHashMultiSet.Element");
+            
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
+            
implementedInterfaces.add("ImplicitLinkedHashMultiCollection.Element");
         }
         Set<String> classModifiers = new HashSet<>();
         classModifiers.add("public");
@@ -152,9 +152,9 @@ public final class MessageDataGenerator {
 
     private void generateHashSet(String className, StructSpec struct) {
         buffer.printf("%n");
-        
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
-        buffer.printf("public static class %s extends 
ImplicitLinkedHashMultiSet<%s> {%n",
-            hashSetType(className), className);
+        
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
+        buffer.printf("public static class %s extends 
ImplicitLinkedHashMultiCollection<%s> {%n",
+            collectionType(className), className);
         buffer.incrementIndent();
         generateHashSetZeroArgConstructor(className);
         generateHashSetSizeArgConstructor(className);
@@ -166,7 +166,7 @@ public final class MessageDataGenerator {
     }
 
     private void generateHashSetZeroArgConstructor(String className) {
-        buffer.printf("public %s() {%n", hashSetType(className));
+        buffer.printf("public %s() {%n", collectionType(className));
         buffer.incrementIndent();
         buffer.printf("super();%n");
         buffer.decrementIndent();
@@ -175,7 +175,7 @@ public final class MessageDataGenerator {
     }
 
     private void generateHashSetSizeArgConstructor(String className) {
-        buffer.printf("public %s(int expectedNumElements) {%n", 
hashSetType(className));
+        buffer.printf("public %s(int expectedNumElements) {%n", 
collectionType(className));
         buffer.incrementIndent();
         buffer.printf("super(expectedNumElements);%n");
         buffer.decrementIndent();
@@ -185,7 +185,7 @@ public final class MessageDataGenerator {
 
     private void generateHashSetIteratorConstructor(String className) {
         headerGenerator.addImport(MessageGenerator.ITERATOR_CLASS);
-        buffer.printf("public %s(Iterator<%s> iterator) {%n", 
hashSetType(className), className);
+        buffer.printf("public %s(Iterator<%s> iterator) {%n", 
collectionType(className), className);
         buffer.incrementIndent();
         buffer.printf("super(iterator);%n");
         buffer.decrementIndent();
@@ -199,7 +199,7 @@ public final class MessageDataGenerator {
             commaSeparatedHashSetFieldAndTypes(struct));
         buffer.incrementIndent();
         generateKeyElement(className, struct);
-        
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
+        
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
         buffer.printf("return find(key);%n");
         buffer.decrementIndent();
         buffer.printf("}%n");
@@ -212,7 +212,7 @@ public final class MessageDataGenerator {
             commaSeparatedHashSetFieldAndTypes(struct));
         buffer.incrementIndent();
         generateKeyElement(className, struct);
-        
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
+        
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
         buffer.printf("return findAll(key);%n");
         buffer.decrementIndent();
         buffer.printf("}%n");
@@ -283,8 +283,8 @@ public final class MessageDataGenerator {
         }
     }
 
-    private static String hashSetType(String baseType) {
-        return baseType + "Set";
+    private static String collectionType(String baseType) {
+        return baseType + "Collection";
     }
 
     private String fieldAbstractJavaType(FieldSpec field) {
@@ -307,8 +307,8 @@ public final class MessageDataGenerator {
         } else if (field.type().isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
             if (field.toStruct().hasKeys()) {
-                
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
-                return hashSetType(arrayType.elementType().toString());
+                
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
+                return collectionType(arrayType.elementType().toString());
             } else {
                 headerGenerator.addImport(MessageGenerator.LIST_CLASS);
                 return "List<" + getBoxedJavaType(arrayType.elementType()) + 
">";
@@ -322,8 +322,8 @@ public final class MessageDataGenerator {
         if (field.type().isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
             if (field.toStruct().hasKeys()) {
-                
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
-                return hashSetType(arrayType.elementType().toString());
+                
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
+                return collectionType(arrayType.elementType().toString());
             } else {
                 headerGenerator.addImport(MessageGenerator.ARRAYLIST_CLASS);
                 return "ArrayList<" + 
getBoxedJavaType(arrayType.elementType()) + ">";
@@ -1233,7 +1233,7 @@ public final class MessageDataGenerator {
             }
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
             if (field.toStruct().hasKeys()) {
-                return "new " + 
hashSetType(arrayType.elementType().toString()) + "(0)";
+                return "new " + 
collectionType(arrayType.elementType().toString()) + "(0)";
             } else {
                 headerGenerator.addImport(MessageGenerator.ARRAYLIST_CLASS);
                 return "new ArrayList<" + 
getBoxedJavaType(arrayType.elementType()) + ">()";
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index 18295c6..e227ded 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -59,8 +59,8 @@ public final class MessageGenerator {
 
     static final String ARRAYLIST_CLASS = "java.util.ArrayList";
 
-    static final String IMPLICIT_LINKED_HASH_MULTI_SET_CLASS =
-        "org.apache.kafka.common.utils.ImplicitLinkedHashMultiSet";
+    static final String IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS =
+        "org.apache.kafka.common.utils.ImplicitLinkedHashMultiCollection";
 
     static final String UNSUPPORTED_VERSION_EXCEPTION_CLASS =
         "org.apache.kafka.common.errors.UnsupportedVersionException";

Reply via email to