ekaterinadimitrova2 commented on code in PR #2655:
URL: https://github.com/apache/cassandra/pull/2655#discussion_r1329215188


##########
src/java/org/apache/cassandra/cql3/Term.java:
##########
@@ -188,35 +208,55 @@ public boolean isTerminal()
         /**
          * @return the serialized value of this terminal.
          */
-        public abstract ByteBuffer get(ProtocolVersion version) throws 
InvalidRequestException;
+        public abstract ByteBuffer get();
 
-        public ByteBuffer bindAndGet(QueryOptions options) throws 
InvalidRequestException
+        /**
+         * Returns the serialized values of this Term elements, if this term 
represents a Collection, Tuple or UDT.
+         * If this term does not represent a multi-elements type it will 
return a list containing the serialized value of this terminal
+         * @return a list containing serialized values of this Term elements
+         */
+        public List<ByteBuffer> getElements()
         {
-            return get(options.getProtocolVersion());
+            ByteBuffer value = get();
+            return value == null ? Collections.emptyList() : 
Collections.singletonList(value);
         }
-    }
 
-    public abstract class MultiItemTerminal extends Terminal
-    {
-        public abstract List<ByteBuffer> getElements();
+        public ByteBuffer bindAndGet(QueryOptions options)
+        {
+            return get();

Review Comment:
   We miss the Override notation in this file



##########
src/java/org/apache/cassandra/db/marshal/TupleType.java:
##########
@@ -276,62 +279,69 @@ public <V> V fromComparableBytes(ValueAccessor<V> 
accessor, ByteSource.Peekable
         assert terminator == ByteSource.TERMINATOR : String.format("Expected 
TERMINATOR (0x%2x) after %d components",
                                                                    
ByteSource.TERMINATOR,
                                                                    
types.size());
-        return buildValue(accessor, componentBuffers);
+        return pack(accessor, Arrays.asList(componentBuffers));
     }
 
-    /**
-     * Split a tuple value into its component values.
-     */
-    public <V> V[] split(ValueAccessor<V> accessor, V value)
+    @Override
+    public List<ByteBuffer> unpack(ByteBuffer value)
     {
-        return split(accessor, value, size(), this);
+        return unpack(value, ByteBufferAccessor.instance);
     }
 
-    /**
-     * Split a tuple value into its component values.
-     */
-    public static <V> V[] split(ValueAccessor<V> accessor, V value, int 
numberOfElements, TupleType type)
+    public <V> List<V> unpack(V value, ValueAccessor<V> accessor)
     {
-        V[] components = accessor.createArray(numberOfElements);
+        int numberOfElements = size();
+        List<V> components = new ArrayList<>(numberOfElements);
         int length = accessor.size(value);
         int position = 0;
         for (int i = 0; i < numberOfElements; i++)
         {
             if (position == length)
-                return Arrays.copyOfRange(components, 0, i);
+            {
+                return components;
+            }
 
             if (position + 4 > length)
-                throw new MarshalException(String.format("Not enough bytes to 
read %dth component", i));
+                throw new MarshalException(String.format("Not enough bytes to 
read %dth %s", i, componentOrFieldName(i)));
 
             int size = accessor.getInt(value, position);
             position += 4;
 
             // size < 0 means null value
             if (size >= 0)
             {
-                if (position + size > length)
-                    throw new MarshalException(String.format("Not enough bytes 
to read %dth component", i));
+                if (length - position < size)
+                    throw new MarshalException(String.format("Not enough bytes 
to read %dth %s", i, componentOrFieldName(i)));
 
-                components[i] = accessor.slice(value, position, size);
+                components.add(accessor.slice(value, position, size));
                 position += size;
             }
             else
-                components[i] = null;
+                components.add(null);
         }
 
         // error out if we got more values in the tuple/UDT than we expected
         if (position < length)
         {
-            throw new MarshalException(String.format("Expected %s %s for %s 
column, but got more",
-                                                     numberOfElements, 
numberOfElements == 1 ? "value" : "values",
-                                                     type.asCQL3Type()));
+            // Invalid remaining data after end of tuple value
+            // Invalid remaining data after end of UDT value

Review Comment:
   ```suggestion
   ```



##########
src/java/org/apache/cassandra/serializers/MapSerializer.java:
##########
@@ -21,6 +21,7 @@
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;

Review Comment:
   I noticed in this class a weird potential bug in the `validate` method:
   ```
   public <T> void validate(T input, ValueAccessor<T> accessor)
       {
           if (accessor.isEmpty(input))
               throw new MarshalException("Not enough bytes to read a map");
           try
           {
               // Empty values are still valid.
               if (accessor.isEmpty(input)) return;
   ```
   
   The first `if` where we throw was added with CASSANDRA-18504. This means we 
changed the previous behavior where we were not throwing, and that second one 
was not even removed. Maybe something got messy on commit? @adelapena, any 
thoughts? 



##########
src/java/org/apache/cassandra/cql3/Maps.java:
##########
@@ -188,7 +174,7 @@ public Term prepare(String keyspace, ColumnSpecification 
receiver) throws Invali
 
             ColumnSpecification keySpec = Maps.keySpecOf(receiver);
             ColumnSpecification valueSpec = Maps.valueSpecOf(receiver);
-            Map<Term, Term> values = new HashMap<>(entries.size());
+            List<Term> values = new ArrayList<>(entries.size() << 1);

Review Comment:
   I should admit that the conversion map to list got me confused a bit....



##########
src/java/org/apache/cassandra/serializers/CollectionSerializer.java:
##########
@@ -35,35 +37,106 @@
 public abstract class CollectionSerializer<T> extends TypeSerializer<T>
 {
     protected abstract List<ByteBuffer> serializeValues(T value);
-    protected abstract int getElementCount(T value);
 
     @Override
     public ByteBuffer serialize(T input)
     {
         List<ByteBuffer> values = serializeValues(input);
-        return pack(values, ByteBufferAccessor.instance, 
getElementCount(input));
+        return pack(values, ByteBufferAccessor.instance);
     }
 
-    public static ByteBuffer pack(Collection<ByteBuffer> values, int elements)
+    public ByteBuffer pack(Collection<ByteBuffer> values)
     {
-        return pack(values, ByteBufferAccessor.instance, elements);
+        return pack(values, ByteBufferAccessor.instance);
     }
 
-    public static <V> V pack(Collection<V> values, ValueAccessor<V> accessor, 
int elements)
+    public <V> V pack(Collection<V> values, ValueAccessor<V> accessor)
     {
         int size = 0;
         for (V value : values)
             size += sizeOfValue(value, accessor);
 
         ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize() + size);
-        writeCollectionSize(result, elements);
+        writeCollectionSize(result, collectionSize(values));
         for (V value : values)
         {
             writeValue(result, value, accessor);
         }
         return accessor.valueOf((ByteBuffer) result.flip());
     }
 
+    public List<ByteBuffer> unpack(ByteBuffer input)
+    {
+        return unpack(input, ByteBufferAccessor.instance);
+    }
+
+    public <V> List<V> unpack(V input, ValueAccessor<V> accessor)
+    {
+        try
+        {
+            int elements = 
numberOfSerializedElements(readCollectionSize(input, accessor));
+            if (elements < 0)
+                throw new MarshalException("The data cannot be deserialized as 
a " + getCollectionName());
+
+            // If the received bytes are not corresponding to a collection, 
the size might be a huge number.
+            // In such a case we do not want to initialize the list with that 
size as it can result
+            // in an OOM. On the other hand we do not want to have to resize 
the list
+            // if we can avoid it, so we put a reasonable limit on the 
initialCapacity.
+            List<V> values = new ArrayList<>(Math.min(elements, 256));

Review Comment:
   I didn't find this limitation anywhere, is this new?



##########
src/java/org/apache/cassandra/cql3/Constants.java:
##########
@@ -438,41 +438,6 @@ public String toString()
         }
     }
 
-    public static class Marker extends AbstractMarker
-    {
-        protected Marker(int bindIndex, ColumnSpecification receiver)
-        {
-            super(bindIndex, receiver);
-            assert !receiver.type.isCollection();
-        }
-
-        @Override
-        public ByteBuffer bindAndGet(QueryOptions options) throws 
InvalidRequestException

Review Comment:
   So looking in Term we split bind... and bindAndGet which was calling 
bind....Makes sense... 
   And here the marker was doing bindAndGet in bind..... confusing, I am glad 
we are getting rid of this :-) 



##########
src/java/org/apache/cassandra/cql3/Term.java:
##########
@@ -26,8 +26,8 @@
 
 /**
  * A CQL3 term, i.e. a column value with or without bind variables.
- *
- * A Term can be either terminal or non terminal. A term object is one that is 
typed and is obtained
+ * <p>
+ * A Term can be either terminal or non-terminal. A term object is one that is 
typed and is obtained
  * from a raw term (Term.Raw) by poviding the actual receiver to which the 
term is supposed to be a

Review Comment:
   `by poviding the actual receiver to which the term is supposed to be a
   `
   How about we add a bit more clarity for first-time readers:
   `by providing the actual {@link ColumnSpecification} receiver to which the 
term is supposed to be a
   `



##########
src/java/org/apache/cassandra/cql3/TokenRelation.java:
##########
@@ -161,7 +149,7 @@ public boolean equals(Object o)
             return false;
 
         TokenRelation tr = (TokenRelation) o;
-        return relationType.equals(tr.relationType) && 
entities.equals(tr.entities) && value.equals(tr.value);
+        return relationType == tr.relationType && entities.equals(tr.entities) 
&& value.equals(tr.value);

Review Comment:
   Why the change from equals to ==?



##########
src/java/org/apache/cassandra/cql3/Tuples.java:
##########
@@ -51,11 +44,30 @@ public static ColumnSpecification 
componentSpecOf(ColumnSpecification column, in
                                        
(getTupleType(column.type)).type(component));
     }
 
+    public static ColumnSpecification makeReceiver(List<? extends 
ColumnSpecification> receivers)

Review Comment:
   So now we do not have a specific raw marker  that will cover tuple of values 
of different multiple columns, where each column can be different type. Can we 
add at least some javadoc  to explain. 



##########
src/java/org/apache/cassandra/cql3/TokenRelation.java:
##########
@@ -180,21 +168,21 @@ private List<ColumnMetadata> 
getColumnDefinitions(TableMetadata table)
     }
 
     /**
-     * Returns the receivers for this relation.
+     * Returns the receiver for this relation.
      *
      * @param table the table meta data
      * @param columnDefs the column definitions
      * @return the receivers for the specified relation.
      * @throws InvalidRequestException if the relation is invalid
      */
-    private static List<? extends ColumnSpecification> 
toReceivers(TableMetadata table,
-                                                                   
List<ColumnMetadata> columnDefs)
-                                                                   throws 
InvalidRequestException
+    private static ColumnSpecification toReceivers(TableMetadata table,
+                                                   List<ColumnMetadata> 
columnDefs)
+                                                   throws 
InvalidRequestException
     {
 
         if (!columnDefs.equals(table.partitionKeyColumns()))
         {
-            checkTrue(columnDefs.containsAll(table.partitionKeyColumns()),
+            checkTrue(new 
HashSet<>(columnDefs).containsAll(table.partitionKeyColumns()),

Review Comment:
   Nice, this reduces the complexity from O(nxm) to O(n+m)



##########
src/java/org/apache/cassandra/cql3/Terms.java:
##########
@@ -47,225 +47,515 @@ public Object get(int index)
         @Override
         public int size()
         {
-            return 0;
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    Terminals UNSET_TERMINALS = new Terminals()

Review Comment:
   Some javadoc maybe?



##########
src/java/org/apache/cassandra/cql3/TokenRelation.java:
##########
@@ -206,9 +194,9 @@ private static List<? extends ColumnSpecification> 
toReceivers(TableMetadata tab
         }
 
         ColumnMetadata firstColumn = columnDefs.get(0);
-        return Collections.singletonList(new 
ColumnSpecification(firstColumn.ksName,

Review Comment:
   KATE: check! This was immutable list of one element that was serializable. 
Now this is mutable....?



##########
src/java/org/apache/cassandra/cql3/Terms.java:
##########
@@ -47,225 +47,515 @@ public Object get(int index)
         @Override
         public int size()
         {
-            return 0;
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    Terminals UNSET_TERMINALS = new Terminals()
+    {
+        @Override
+        @SuppressWarnings("unchecked")
+        public List<ByteBuffer> get()
+        {
+            return (List<ByteBuffer>) UNSET_LIST;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public List<List<ByteBuffer>> getElements()
+        {
+            return (List<List<ByteBuffer>>) UNSET_LIST;
+        }
+
+        @Override
+        public List<Terminal> asList()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void addFunctionsTo(List<Function> functions)
+        {
+
+        }
+
+        @Override
+        public boolean containsSingleTerm()
+        {
+            return false;
+        }
+
+        @Override
+        public Term asSingleTerm()
+        {
+            throw new UnsupportedOperationException();
         }
     };
 
     /**
      * Adds all functions (native and user-defined) used by any of the terms 
to the specified list.
      * @param functions the list to add to
      */
-    public void addFunctionsTo(List<Function> functions);
+    void addFunctionsTo(List<Function> functions);
 
     /**
      * Collects the column specifications for the bind variables in the terms.
-     * This is obviously a no-op if the terms are Terminal.
+     * This is obviously a no-op if the terms are Terminals.
      *
      * @param boundNames the variables specification where to collect the
      * bind variables of the terms in.
      */
-    public void collectMarkerSpecification(VariableSpecifications boundNames);
+    void collectMarkerSpecification(VariableSpecifications boundNames);
 
     /**
      * Bind the values in these terms to the values contained in {@code 
options}.
-     * This is obviously a no-op if the term is Terminal.
+     * This is obviously a no-op if this {@code Terms} are Terminals.
      *
-     * @param options the values to bind markers to.
-     * @return the result of binding all the variables of these NonTerminals 
or an {@code UNSET_LIST} if the term
-     * was unset.
+     * @param options the query options containing the values to bind markers 
to.
+     * @return the result of binding all the variables of these NonTerminals.
      */
-    public List<Terminal> bind(QueryOptions options);
+    Terminals bind(QueryOptions options);
 
+    /**
+     * A shorter for {@code bind(options).get()}.
+     * We expose it mainly because for constants it can avoid allocating a 
temporary
+     * object between the bind and the get.
+     * @param options the query options containing the values to bind markers 
to.
+     */
+    List<ByteBuffer> bindAndGet(QueryOptions options);
 
-    public List<ByteBuffer> bindAndGet(QueryOptions options);
+    /**
+     * A shorter for {@code bind(options).getElements()}.
+     * We expose it mainly because for constants it can avoid allocating a 
temporary
+     * object between the {@code bind} and the {@code getElements}.
+     * @param options the query options containing the values to bind markers 
to.
+     */
+    List<List<ByteBuffer>> bindAndGetElements(QueryOptions options);
 
     /**
-     * Creates a {@code Terms} for the specified list marker.
+     * Creates a {@code Terms} containing a single {@code Term}.
      *
-     * @param marker the list  marker
-     * @param type the element type
-     * @return a {@code Terms} for the specified list marker
+     * @param term the {@code Term}
+     * @return a {@code Terms} containing a single {@code Term}.
      */
-    public static Terms ofListMarker(final Lists.Marker marker, final 
AbstractType<?> type)
+    static Terms of(final Term term)
     {
-        return new Terms()
-        {
-            @Override
-            public void addFunctionsTo(List<Function> functions)
-            {
-            }
+        if (term.isTerminal())
+            return Terminals.of(term == Constants.NULL_VALUE ? null : 
(Terminal) term);
+
+        return NonTerminals.of((NonTerminal) term);
+    }
 
-            @Override
-            public void collectMarkerSpecification(VariableSpecifications 
boundNames)
+    /**
+     * Creates a {@code Terms} containing a set of {@code Term}.
+     *
+     * @param terms the terms
+     * @return a {@code Terms} containing a set of {@code Term}.
+     */
+    static Terms of(final List<Term> terms)
+    {
+        boolean allTerminals = terms.stream().allMatch(Term::isTerminal);
+
+        if (allTerminals)
+        {
+            int size = terms.size();
+            List<Terminal> terminals = new ArrayList<>(size);
+            for (int i = 0; i < size; i++)
             {
-                marker.collectMarkerSpecification(boundNames);
+                Terminal terminal = (Terminal) terms.get(i);
+                terminals.add(terminal == Constants.NULL_VALUE ? null : 
terminal);
             }
+            return Terminals.of(terminals);
+        }
 
-            @Override
-            public List<ByteBuffer> bindAndGet(QueryOptions options)
-            {
-                Terminal terminal = marker.bind(options);
+        return NonTerminals.of(terms);
+    }
 
-                if (terminal == null)
-                    return null;
+    /**
+     * Checks if these {@code terms} knows that it contains a single {@code 
term}.
+     * <p>
+     * If the instance is a marker it will not know how many terms it 
represents and will return false.
+     * @return {@code true} if this {@code terms} know contains a single 
{@code term}, {@code false} otherwise.
+     */
+    boolean containsSingleTerm();
 
-                if (terminal == Constants.UNSET_VALUE)
-                    return UNSET_LIST;
+    /**
+     * If this {@code terms} contains a single term it will be returned 
otherwise an UnsupportedOperationException will be thrown.
+     * @return a single term representing the single element of this {@code 
terms}.
+     * @throws UnsupportedOperationException if this term does not know how 
many terms it contains or contains more than one term
+     */
+    Term asSingleTerm();
 
-                return ((MultiItemTerminal) terminal).getElements();
-            }
+    /**
+     * Adds all functions (native and user-defined) of the specified terms to 
the list.
+     * @param functions the list to add to
+     */
+    static void addFunctions(Iterable<? extends Term> terms, List<Function> 
functions)
+    {
+        for (Term term : terms)
+        {
+            if (term != null)
+                term.addFunctionsTo(functions);
+        }
+    }
 
-            @Override
-            public List<Terminal> bind(QueryOptions options)
-            {
-                Terminal terminal = marker.bind(options);
+    /**
+     * A parsed, non prepared (thus untyped) set of terms.
+     */
+    abstract class Raw implements AssignmentTestable
+    {
+        /**
+         * This method validates this {@code Terms.Raw} is valid for the 
provided column
+         * specification and "prepare" this {@code Terms.Raw}, returning the 
resulting {@link Terms}.
+         *
+         * @param receiver the "column" the set of terms are supposed to be a 
value of. Note
+         * that the ColumnSpecification may not correspond to a real column.
+         * @return the prepared terms
+         */
+        public abstract Terms prepare(String keyspace, ColumnSpecification 
receiver) throws InvalidRequestException;
 
-                if (terminal == null)
-                    return null;
+        /**
+         * @return a String representation of the raw terms that can be used 
when reconstructing a CQL query string.
+         */
+        public abstract String getText();
 
-                if (terminal == Constants.UNSET_VALUE)
-                    return UNSET_LIST;
+        /**
+         * The type of the {@code Terms} if it can be infered.
+         *
+         * @param keyspace the keyspace on which the statement containing 
these terms is on.
+         * @return the type of this {@code Terms} if inferrable, or {@code 
null}
+         * otherwise (for instance, the type isn't inferrable for a bind 
marker. Even for
+         * literals, the exact type is not inferrable since they are valid for 
many
+         * different types and so this will return {@code null} too).
+         */
+        public abstract AbstractType<?> getExactTypeIfKnown(String keyspace);
 
-                java.util.function.Function<ByteBuffer, Term.Terminal> 
deserializer = deserializer(options.getProtocolVersion());
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return getExactTypeIfKnown(keyspace);
+        }
 
-                List<ByteBuffer> boundValues = ((MultiItemTerminal) 
terminal).getElements();
-                List<Term.Terminal> values = new 
ArrayList<>(boundValues.size());
-                for (int i = 0, m = boundValues.size(); i < m; i++)
-                {
-                    ByteBuffer buffer = boundValues.get(i);
-                    Term.Terminal value = buffer == null ? null : 
deserializer.apply(buffer);
-                    values.add(value);
-                }
-                return values;
-            }
+        @Override
+        public String toString()
+        {
+            return getText();
+        }
 
-            public java.util.function.Function<ByteBuffer, Term.Terminal> 
deserializer(ProtocolVersion version)
+        public static Raw of(List<? extends Term.Raw> raws)
+        {
+            return new Raw()
             {
-                if (type.isCollection())
+                @Override
+                public Terms prepare(String keyspace, ColumnSpecification 
receiver) throws InvalidRequestException
                 {
-                    switch (((CollectionType<?>) type).kind)
+                    List<Term> terms = new ArrayList<>(raws.size());
+                    for (Term.Raw raw : raws)
                     {
-                        case LIST:
-                            return e -> Lists.Value.fromSerialized(e, 
(ListType<?>) type);
-                        case SET:
-                            return e -> Sets.Value.fromSerialized(e, 
(SetType<?>) type);
-                        case MAP:
-                            return e -> Maps.Value.fromSerialized(e, 
(MapType<?, ?>) type);
+                        terms.add(raw.prepare(keyspace, receiver));
                     }
-                    throw new AssertionError();
+                    return Terms.of(terms);
                 }
-                return e -> new Constants.Value(e);
-            }
-        };
+
+                @Override
+                public String getText()
+                {
+                    return raws.stream().map(Term.Raw::getText)
+                                        .collect(Collectors.joining(", ", "(", 
")"));
+                }
+
+                @Override
+                public AbstractType<?> getExactTypeIfKnown(String keyspace)
+                {
+                    return null;
+                }
+
+                @Override
+                public TestResult testAssignment(String keyspace, 
ColumnSpecification receiver)
+                {
+                    return AssignmentTestable.TestResult.WEAKLY_ASSIGNABLE;
+                }
+            };
+        }
     }
 
     /**
-     * Creates a {@code Terms} containing a single {@code Term}.
-     *
-     * @param term the {@code Term}
-     * @return a {@code Terms} containing a single {@code Term}.
+     * Set of terms that contains only terminal terms.
      */
-    public static Terms of(final Term term)
+    abstract class Terminals implements Terms
     {
-        return new Terms()
+        @Override
+        public void collectMarkerSpecification(VariableSpecifications 
boundNames) {}
+
+        @Override
+        public final Terminals bind(QueryOptions options)
+        {
+            return this;
+        }
+
+        @Override
+        public List<ByteBuffer> bindAndGet(QueryOptions options)
+        {
+            return get();
+        }
+
+        @Override
+        public List<List<ByteBuffer>> bindAndGetElements(QueryOptions options)
+        {
+            return getElements();
+        }
+
+        /**
+         * @return the serialized values of this {@code Terminals}.
+         */
+        public abstract List<ByteBuffer> get();
+
+        /**
+         * Returns the serialized values of each Term elements, if these term 
represents a Collection, Tuple or UDT.
+         * If the terms do not represent multi-elements type the method will 
return a list containing the serialized value of the terminals
+         * @return a list containing serialized values of each Term elements
+         */
+        public abstract List<List<ByteBuffer>> getElements();
+
+        /**
+         * Converts these {@code Terminals} into a {@code List} of {@code 
Term.Terminal}.
+         * @return a {@code List} of {@code Term.Terminal}.
+         */
+        public abstract List<Term.Terminal> asList();
+
+        /**
+         * Converts a {@code Terminal} into a {@code Terminals}.
+         * @param terminal the {@code Terminal} to convert
+         * @return a {@code Terminals}.
+         */
+        public static Terminals of(Terminal terminal)
+        {
+            return new Terminals()
+            {
+                @Override
+                public List<ByteBuffer> get()
                 {
-                    @Override
-                    public void addFunctionsTo(List<Function> functions)
-                    {
-                        term.addFunctionsTo(functions);
-                    }
+                    return Collections.singletonList(terminal == null ? null : 
terminal.get());
+                }
 
-                    @Override
-                    public void 
collectMarkerSpecification(VariableSpecifications boundNames)
-                    {
-                        term.collectMarkerSpecification(boundNames);
-                    }
+                @Override
+                public List<List<ByteBuffer>> getElements()
+                {
+                    return Collections.singletonList(terminal == null ? null : 
terminal.getElements());
+                }
+
+                @Override
+                public List<Terminal> asList()
+                {
+                    return Collections.singletonList(terminal);
+                }
 
-                    @Override
-                    public List<ByteBuffer> bindAndGet(QueryOptions options)
+                @Override
+                public void addFunctionsTo(List<Function> functions)
+                {
+                    if (terminal != null)
+                        terminal.addFunctionsTo(functions);
+                }
+
+                @Override
+                public boolean containsSingleTerm()
+                {
+                    return true;
+                }
+
+                @Override
+                public Term asSingleTerm()
+                {
+                    return terminal;
+                }
+            };
+        }
+
+        public static Terminals of(List<Terminal> terminals)
+        {
+            return new Terminals()
+            {
+                @Override
+                public List<Terminal> asList()
+                {
+                    return terminals;
+                }
+
+                @Override
+                public List<ByteBuffer> get()
+                {
+                    int size = terminals.size();
+                    List<ByteBuffer> buffers = new ArrayList<>(size);
+                    for (int i = 0; i < size; i++)
                     {
-                        return 
Collections.singletonList(term.bindAndGet(options));
+                        Terminal terminal = terminals.get(i);
+                        buffers.add(terminal == null ? null : terminal.get());
                     }
+                    return buffers;
+                }
 
-                    @Override
-                    public List<Terminal> bind(QueryOptions options)
+                @Override
+                public List<List<ByteBuffer>> getElements()
+                {
+                    int size = terminals.size();
+                    List<List<ByteBuffer>> buffers = new ArrayList<>(size);
+                    for (int i = 0; i < size; i++)
                     {
-                        return Collections.singletonList(term.bind(options));
+                        Terminal terminal = terminals.get(i);
+                        buffers.add(terminal == null ? null : 
terminal.getElements());
                     }
-                };
+                    return buffers;
+                }
+
+                @Override
+                public void addFunctionsTo(List<Function> functions)
+                {
+                    addFunctions(terminals, functions);
+                }
+
+                @Override
+                public boolean containsSingleTerm()
+                {
+                    return terminals.size() == 1;
+                }
+
+                @Override
+                public Terminal asSingleTerm()
+                {
+                    if (!containsSingleTerm())
+                        throw new UnsupportedOperationException("This terms 
contains cannot be converted in a single term");

Review Comment:
   ```suggestion
                           throw new UnsupportedOperationException("This terms 
content cannot be converted in a single term");
   ```



##########
src/java/org/apache/cassandra/cql3/Terms.java:
##########
@@ -47,225 +47,515 @@ public Object get(int index)
         @Override
         public int size()
         {
-            return 0;
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    Terminals UNSET_TERMINALS = new Terminals()
+    {
+        @Override
+        @SuppressWarnings("unchecked")
+        public List<ByteBuffer> get()
+        {
+            return (List<ByteBuffer>) UNSET_LIST;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public List<List<ByteBuffer>> getElements()
+        {
+            return (List<List<ByteBuffer>>) UNSET_LIST;
+        }
+
+        @Override
+        public List<Terminal> asList()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void addFunctionsTo(List<Function> functions)
+        {
+
+        }
+
+        @Override
+        public boolean containsSingleTerm()
+        {
+            return false;
+        }
+
+        @Override
+        public Term asSingleTerm()
+        {
+            throw new UnsupportedOperationException();
         }
     };
 
     /**
      * Adds all functions (native and user-defined) used by any of the terms 
to the specified list.
      * @param functions the list to add to
      */
-    public void addFunctionsTo(List<Function> functions);
+    void addFunctionsTo(List<Function> functions);
 
     /**
      * Collects the column specifications for the bind variables in the terms.
-     * This is obviously a no-op if the terms are Terminal.
+     * This is obviously a no-op if the terms are Terminals.
      *
      * @param boundNames the variables specification where to collect the
      * bind variables of the terms in.
      */
-    public void collectMarkerSpecification(VariableSpecifications boundNames);
+    void collectMarkerSpecification(VariableSpecifications boundNames);
 
     /**
      * Bind the values in these terms to the values contained in {@code 
options}.
-     * This is obviously a no-op if the term is Terminal.
+     * This is obviously a no-op if this {@code Terms} are Terminals.
      *
-     * @param options the values to bind markers to.
-     * @return the result of binding all the variables of these NonTerminals 
or an {@code UNSET_LIST} if the term
-     * was unset.
+     * @param options the query options containing the values to bind markers 
to.
+     * @return the result of binding all the variables of these NonTerminals.
      */
-    public List<Terminal> bind(QueryOptions options);
+    Terminals bind(QueryOptions options);
 
+    /**
+     * A shorter for {@code bind(options).get()}.
+     * We expose it mainly because for constants it can avoid allocating a 
temporary
+     * object between the bind and the get.
+     * @param options the query options containing the values to bind markers 
to.
+     */
+    List<ByteBuffer> bindAndGet(QueryOptions options);
 
-    public List<ByteBuffer> bindAndGet(QueryOptions options);
+    /**
+     * A shorter for {@code bind(options).getElements()}.
+     * We expose it mainly because for constants it can avoid allocating a 
temporary
+     * object between the {@code bind} and the {@code getElements}.
+     * @param options the query options containing the values to bind markers 
to.
+     */
+    List<List<ByteBuffer>> bindAndGetElements(QueryOptions options);
 
     /**
-     * Creates a {@code Terms} for the specified list marker.
+     * Creates a {@code Terms} containing a single {@code Term}.
      *
-     * @param marker the list  marker
-     * @param type the element type
-     * @return a {@code Terms} for the specified list marker
+     * @param term the {@code Term}
+     * @return a {@code Terms} containing a single {@code Term}.
      */
-    public static Terms ofListMarker(final Lists.Marker marker, final 
AbstractType<?> type)
+    static Terms of(final Term term)
     {
-        return new Terms()
-        {
-            @Override
-            public void addFunctionsTo(List<Function> functions)
-            {
-            }
+        if (term.isTerminal())
+            return Terminals.of(term == Constants.NULL_VALUE ? null : 
(Terminal) term);
+
+        return NonTerminals.of((NonTerminal) term);
+    }
 
-            @Override
-            public void collectMarkerSpecification(VariableSpecifications 
boundNames)
+    /**
+     * Creates a {@code Terms} containing a set of {@code Term}.
+     *
+     * @param terms the terms
+     * @return a {@code Terms} containing a set of {@code Term}.
+     */
+    static Terms of(final List<Term> terms)
+    {
+        boolean allTerminals = terms.stream().allMatch(Term::isTerminal);
+
+        if (allTerminals)
+        {
+            int size = terms.size();
+            List<Terminal> terminals = new ArrayList<>(size);
+            for (int i = 0; i < size; i++)
             {
-                marker.collectMarkerSpecification(boundNames);
+                Terminal terminal = (Terminal) terms.get(i);
+                terminals.add(terminal == Constants.NULL_VALUE ? null : 
terminal);
             }
+            return Terminals.of(terminals);
+        }
 
-            @Override
-            public List<ByteBuffer> bindAndGet(QueryOptions options)
-            {
-                Terminal terminal = marker.bind(options);
+        return NonTerminals.of(terms);
+    }
 
-                if (terminal == null)
-                    return null;
+    /**
+     * Checks if these {@code terms} knows that it contains a single {@code 
term}.
+     * <p>
+     * If the instance is a marker it will not know how many terms it 
represents and will return false.
+     * @return {@code true} if this {@code terms} know contains a single 
{@code term}, {@code false} otherwise.
+     */
+    boolean containsSingleTerm();
 
-                if (terminal == Constants.UNSET_VALUE)
-                    return UNSET_LIST;
+    /**
+     * If this {@code terms} contains a single term it will be returned 
otherwise an UnsupportedOperationException will be thrown.
+     * @return a single term representing the single element of this {@code 
terms}.
+     * @throws UnsupportedOperationException if this term does not know how 
many terms it contains or contains more than one term
+     */
+    Term asSingleTerm();
 
-                return ((MultiItemTerminal) terminal).getElements();
-            }
+    /**
+     * Adds all functions (native and user-defined) of the specified terms to 
the list.
+     * @param functions the list to add to
+     */
+    static void addFunctions(Iterable<? extends Term> terms, List<Function> 
functions)
+    {
+        for (Term term : terms)
+        {
+            if (term != null)
+                term.addFunctionsTo(functions);
+        }
+    }
 
-            @Override
-            public List<Terminal> bind(QueryOptions options)
-            {
-                Terminal terminal = marker.bind(options);
+    /**
+     * A parsed, non prepared (thus untyped) set of terms.
+     */
+    abstract class Raw implements AssignmentTestable
+    {
+        /**
+         * This method validates this {@code Terms.Raw} is valid for the 
provided column
+         * specification and "prepare" this {@code Terms.Raw}, returning the 
resulting {@link Terms}.
+         *
+         * @param receiver the "column" the set of terms are supposed to be a 
value of. Note
+         * that the ColumnSpecification may not correspond to a real column.
+         * @return the prepared terms
+         */
+        public abstract Terms prepare(String keyspace, ColumnSpecification 
receiver) throws InvalidRequestException;
 
-                if (terminal == null)
-                    return null;
+        /**
+         * @return a String representation of the raw terms that can be used 
when reconstructing a CQL query string.
+         */
+        public abstract String getText();
 
-                if (terminal == Constants.UNSET_VALUE)
-                    return UNSET_LIST;
+        /**
+         * The type of the {@code Terms} if it can be infered.
+         *
+         * @param keyspace the keyspace on which the statement containing 
these terms is on.
+         * @return the type of this {@code Terms} if inferrable, or {@code 
null}
+         * otherwise (for instance, the type isn't inferrable for a bind 
marker. Even for
+         * literals, the exact type is not inferrable since they are valid for 
many
+         * different types and so this will return {@code null} too).
+         */
+        public abstract AbstractType<?> getExactTypeIfKnown(String keyspace);
 
-                java.util.function.Function<ByteBuffer, Term.Terminal> 
deserializer = deserializer(options.getProtocolVersion());
+        @Override
+        public AbstractType<?> getCompatibleTypeIfKnown(String keyspace)
+        {
+            return getExactTypeIfKnown(keyspace);
+        }
 
-                List<ByteBuffer> boundValues = ((MultiItemTerminal) 
terminal).getElements();
-                List<Term.Terminal> values = new 
ArrayList<>(boundValues.size());
-                for (int i = 0, m = boundValues.size(); i < m; i++)
-                {
-                    ByteBuffer buffer = boundValues.get(i);
-                    Term.Terminal value = buffer == null ? null : 
deserializer.apply(buffer);
-                    values.add(value);
-                }
-                return values;
-            }
+        @Override
+        public String toString()
+        {
+            return getText();
+        }
 
-            public java.util.function.Function<ByteBuffer, Term.Terminal> 
deserializer(ProtocolVersion version)
+        public static Raw of(List<? extends Term.Raw> raws)
+        {
+            return new Raw()
             {
-                if (type.isCollection())
+                @Override
+                public Terms prepare(String keyspace, ColumnSpecification 
receiver) throws InvalidRequestException
                 {
-                    switch (((CollectionType<?>) type).kind)
+                    List<Term> terms = new ArrayList<>(raws.size());
+                    for (Term.Raw raw : raws)
                     {
-                        case LIST:
-                            return e -> Lists.Value.fromSerialized(e, 
(ListType<?>) type);
-                        case SET:
-                            return e -> Sets.Value.fromSerialized(e, 
(SetType<?>) type);
-                        case MAP:
-                            return e -> Maps.Value.fromSerialized(e, 
(MapType<?, ?>) type);
+                        terms.add(raw.prepare(keyspace, receiver));
                     }
-                    throw new AssertionError();
+                    return Terms.of(terms);
                 }
-                return e -> new Constants.Value(e);
-            }
-        };
+
+                @Override
+                public String getText()
+                {
+                    return raws.stream().map(Term.Raw::getText)
+                                        .collect(Collectors.joining(", ", "(", 
")"));
+                }
+
+                @Override
+                public AbstractType<?> getExactTypeIfKnown(String keyspace)
+                {
+                    return null;
+                }
+
+                @Override
+                public TestResult testAssignment(String keyspace, 
ColumnSpecification receiver)
+                {
+                    return AssignmentTestable.TestResult.WEAKLY_ASSIGNABLE;
+                }
+            };
+        }
     }
 
     /**
-     * Creates a {@code Terms} containing a single {@code Term}.
-     *
-     * @param term the {@code Term}
-     * @return a {@code Terms} containing a single {@code Term}.
+     * Set of terms that contains only terminal terms.
      */
-    public static Terms of(final Term term)
+    abstract class Terminals implements Terms
     {
-        return new Terms()
+        @Override
+        public void collectMarkerSpecification(VariableSpecifications 
boundNames) {}
+
+        @Override
+        public final Terminals bind(QueryOptions options)
+        {
+            return this;
+        }
+
+        @Override
+        public List<ByteBuffer> bindAndGet(QueryOptions options)
+        {
+            return get();
+        }
+
+        @Override
+        public List<List<ByteBuffer>> bindAndGetElements(QueryOptions options)
+        {
+            return getElements();
+        }
+
+        /**
+         * @return the serialized values of this {@code Terminals}.
+         */
+        public abstract List<ByteBuffer> get();
+
+        /**
+         * Returns the serialized values of each Term elements, if these term 
represents a Collection, Tuple or UDT.

Review Comment:
   `* Returns the serialized values of each Term elements, if these term 
represents a Collection, Tuple, UDT or Vector.`



##########
src/java/org/apache/cassandra/db/marshal/TupleType.java:
##########
@@ -404,20 +440,24 @@ public ByteBuffer fromString(String source)
             throw new MarshalException(String.format("Invalid tuple literal: 
too many elements. Type %s expects %d but got %d",
                                                      asCQL3Type(), size(), 
fieldStrings.size()));
 
-        ByteBuffer[] fields = new ByteBuffer[fieldStrings.size()];
+        List<ByteBuffer> fields = new ArrayList<>(fieldStrings.size());
         for (int i = 0; i < fieldStrings.size(); i++)
         {
             String fieldString = fieldStrings.get(i);
             // We use @ to represent nulls
             if (fieldString.equals("@"))
-                continue;
-
-            AbstractType<?> type = type(i);
-            fieldString = 
ESCAPED_COLON_PAT.matcher(fieldString).replaceAll(COLON);
-            fieldString = ESCAPED_AT_PAT.matcher(fieldString).replaceAll(AT);
-            fields[i] = type.fromString(fieldString);
+            {
+                fields.add(null);

Review Comment:
   We weren't adding null before? 



##########
src/java/org/apache/cassandra/serializers/MapSerializer.java:
##########
@@ -78,10 +79,15 @@ public List<ByteBuffer> serializeValues(Map<K, V> map)
         return buffers;
     }
 
+    public <E> int collectionSize(Collection<E> elements)
+    {
+        return elements.size() >> 1;
+    }
+
     @Override
-    public int getElementCount(Map<K, V> value)
+    protected int numberOfSerializedElements(int collectionSize)
     {
-        return value.size();
+        return collectionSize * 2; // keys and values

Review Comment:
   Map.size() returns number of key-value pairs. From the JavaDoc:
   ```
   Returns the number of key-value mappings in this map. If the map contains 
more than Integer.MAX_VALUE elements, returns Integer.MAX_VALUE.
   Returns:
   the number of key-value mappings in this map
   ```



##########
src/java/org/apache/cassandra/cql3/Term.java:
##########
@@ -85,6 +94,13 @@ default boolean isTerminal()
      */
     void addFunctionsTo(List<Function> functions);
 
+    static ByteBuffer asBytes(String keyspace, String term, AbstractType<?> 
type)

Review Comment:
   JavaDoc?



##########
src/java/org/apache/cassandra/db/marshal/UserType.java:
##########
@@ -173,25 +173,24 @@ public ByteBuffer 
serializeForNativeProtocol(Iterator<Cell<?>> cells, ProtocolVe
     {
         assert isMultiCell;
 
-        ByteBuffer[] components = new ByteBuffer[size()];

Review Comment:
   `serializeForNativeProtocol` - misleading one at this point; we don't use 
the `ProtocolVersion` here. Would you like us to fix this?



##########
src/java/org/apache/cassandra/db/marshal/VectorType.java:
##########
@@ -189,14 +190,33 @@ public <V> V decomposeAsFloat(ValueAccessor<V> accessor, 
float[] value)
         return buffer;
     }
 
-    public ByteBuffer decomposeRaw(List<ByteBuffer> elements)
+    public ByteBuffer pack(List<ByteBuffer> elements)
     {
-        return decomposeRaw(elements, ByteBufferAccessor.instance);
+        return pack(elements, ByteBufferAccessor.instance);
     }
 
-    public <V> V decomposeRaw(List<V> elements, ValueAccessor<V> accessor)
+    public <V> V pack(List<V> elements, ValueAccessor<V> accessor)
     {
-        return getSerializer().serializeRaw(elements, accessor);
+        return getSerializer().pack(elements, accessor);
+    }
+
+    @Override
+    public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer> 
buffers)
+    {
+        if (buffers == null)
+            return null;

Review Comment:
   There is repetition for this check.  InMarker.bind checks for null already.  
However, we do not have it in MultiElements.bind. So in the first case, this 
one is noop. 



##########
test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java:
##########
@@ -150,7 +149,7 @@ public void testInvalidQueries() throws Throwable
 
         assertInvalidSyntax("INSERT INTO %s (k, t) VALUES (0, ())");
 
-        assertInvalidMessage("Invalid tuple literal for t: too many elements. 
Type frozen<tuple<int, text, double>> expects 3 but got 4",
+        assertInvalidMessage("Expected 3 elements in value tuple, but got 4: 
(2, 'foo', 3.1, 'bar')",

Review Comment:
   I would probably still prefer to say `3 elements in value for tuple t` or so



##########
test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java:
##########
@@ -964,7 +964,7 @@ public void testInvalidInputForSet() throws Throwable
         createTable("CREATE TABLE %s(pk int PRIMARY KEY, s set<text>)");
         assertInvalidMessage("Not enough bytes to read a set",
                              "INSERT INTO %s (pk, s) VALUES (?, ?)", 1, 
"test");
-        assertInvalidMessage("Null value read when not allowed",
+        assertInvalidMessage("Not enough bytes to read a set",

Review Comment:
   so Long.MAX_VALUE was considered null, only for set and map, but not for 
list? I don't fully understand this. 
   



##########
src/java/org/apache/cassandra/cql3/InMarker.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MultiElementType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * A CQL named or unnamed bind marker for an {@code IN} restriction.
+ * For example, 'SELECT ... WHERE pk IN ?' or 'SELECT ... WHERE pk IN :myKey '.
+ */
+public final class InMarker extends Terms.NonTerminals
+{
+    private final int bindIndex;
+    private final ColumnSpecification receiver;
+
+    private InMarker(int bindIndex, ColumnSpecification receiver)
+    {
+        this.bindIndex = bindIndex;
+        this.receiver = receiver;
+    }
+
+    @Override
+    public void addFunctionsTo(List<Function> functions) {}
+
+    @Override
+    public void collectMarkerSpecification(VariableSpecifications boundNames)
+    {
+        boundNames.add(bindIndex, receiver);
+    }
+
+    @Override
+    public Terminals bind(QueryOptions options)
+    {
+        ByteBuffer values = options.getValues().get(bindIndex);
+        if (values == null)
+            return null;
+
+        if (values == ByteBufferUtil.UNSET_BYTE_BUFFER)
+            return UNSET_TERMINALS;
+
+        ListType<?> type = (ListType) receiver.type;
+        return toTerminals(values, type, 
terminalConverter(type.getElementsType()));
+    }
+
+    private <T> Terminals toTerminals(ByteBuffer value,
+                                      ListType<T> type,
+                                      java.util.function.Function<ByteBuffer, 
Term.Terminal> terminalConverter)
+    {
+        List<T> elements = type.getSerializer().deserialize(value, 
ByteBufferAccessor.instance);
+        List<Term.Terminal> terminals = new ArrayList<>(elements.size());
+        for (T element : elements)
+        {
+            terminals.add(element == null ? null : 
terminalConverter.apply(type.getElementsType().decompose(element)));
+        }
+        return Terminals.of(terminals);
+    }
+
+    public static java.util.function.Function<ByteBuffer, Term.Terminal> 
terminalConverter(AbstractType<?> type)
+    {
+        if (type instanceof MultiElementType<?>)
+            return e -> MultiElements.Value.fromSerialized(e, 
(MultiElementType<?>) type);
+
+        return Constants.Value::new;
+    }
+
+    @Override
+    public List<ByteBuffer> bindAndGet(QueryOptions options)
+    {
+        Terminals terminals = bind(options);
+        return terminals == null ? null : terminals.get();
+    }
+
+    @Override
+    public List<List<ByteBuffer>> bindAndGetElements(QueryOptions options)
+    {
+        Terminals terminals = bind(options);
+        return terminals == null ? null : terminals.getElements();
+    }
+
+    @Override
+    public boolean containsSingleTerm()
+    {
+        return false;
+    }
+
+    @Override
+    public Term asSingleTerm()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * A raw placeholder for multiple values of the same type for a single 
column.
+     * For example, 'SELECT ... WHERE user_id IN ?'.
+     * <p>
+     * Because a single type is used, a List is used to represent the values.
+     */
+    public static final class Raw extends Terms.Raw

Review Comment:
   I am missing in this javadoc a note where we handle those that represent 
more than one type. 



##########
src/java/org/apache/cassandra/cql3/Sets.java:
##########
@@ -434,9 +311,7 @@ public void execute(DecoratedKey partitionKey, 
UpdateParameters params) throws I
                 return;
 
             // This can be either a set or a single element
-            Set<ByteBuffer> toDiscard = value instanceof Sets.Value
-                                      ? ((Sets.Value)value).elements
-                                      : Collections.singleton(value.get());
+            List<ByteBuffer> toDiscard = value.getElements();

Review Comment:
   We can maybe keep the instanceOf check here?



##########
src/java/org/apache/cassandra/db/marshal/VectorType.java:
##########
@@ -189,14 +190,33 @@ public <V> V decomposeAsFloat(ValueAccessor<V> accessor, 
float[] value)
         return buffer;
     }
 
-    public ByteBuffer decomposeRaw(List<ByteBuffer> elements)
+    public ByteBuffer pack(List<ByteBuffer> elements)
     {
-        return decomposeRaw(elements, ByteBufferAccessor.instance);
+        return pack(elements, ByteBufferAccessor.instance);
     }
 
-    public <V> V decomposeRaw(List<V> elements, ValueAccessor<V> accessor)
+    public <V> V pack(List<V> elements, ValueAccessor<V> accessor)
     {
-        return getSerializer().serializeRaw(elements, accessor);
+        return getSerializer().pack(elements, accessor);
+    }
+
+    @Override
+    public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer> 
buffers)
+    {

Review Comment:
   Can we add a short comment in the override methods at the top that we are 
not sorting, something like:
   `// Only filter and validate for this type`
   Makes it easier to read



##########
src/java/org/apache/cassandra/db/marshal/SetType.java:
##########
@@ -243,4 +243,18 @@ public ByteBuffer getMaskedValue()
     {
         return decompose(Collections.emptySet());
     }
+
+    @Override
+    public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer> 
buffers)
+    {
+        SortedSet<ByteBuffer> sorted = new TreeSet<>(elements);
+        for (ByteBuffer buffer: buffers)
+        {
+            if (buffer == null)
+                throw new MarshalException("null is not supported inside 
collections");
+            elements.validate(buffer);

Review Comment:
   What about this check:
   
https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/cql3/Sets.java#L310



##########
src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java:
##########
@@ -218,7 +235,10 @@ public final SingleRestriction 
doMergeWith(SingleRestriction otherRestriction)
         @Override
         public MultiCBuilder appendTo(MultiCBuilder builder, QueryOptions 
options)
         {
-            builder.addEachElementToAll(getValues(options));
+            List<ByteBuffer> buffers = values.bindAndGet(options);
+            checkNotNull(buffers, "Invalid null value for column %s", 
columnDef.name);
+            checkFalse(buffers == Terms.UNSET_LIST, "Invalid unset value for 
column %s", columnDef.name);

Review Comment:
   Where did these two checks come from?



##########
src/java/org/apache/cassandra/cql3/Vectors.java:
##########
@@ -18,22 +18,18 @@
 
 package org.apache.cassandra.cql3;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.VectorType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class Vectors
+public final class Vectors

Review Comment:
   Good call making it final



##########
src/java/org/apache/cassandra/db/marshal/MapType.java:
##########
@@ -259,40 +244,37 @@ static <V> ByteSource 
asComparableBytesMap(AbstractType<?> keysComparator,
         ByteSource[] srcs = new ByteSource[size * 2];
         for (int i = 0; i < size; ++i)
         {
-            V k = CollectionSerializer.readValue(data, accessor, offset);
+            T k = CollectionSerializer.readValue(data, accessor, offset);
             offset += CollectionSerializer.sizeOfValue(k, accessor);
-            srcs[i * 2 + 0] = keysComparator.asComparableBytes(accessor, k, 
version);
-            V v = CollectionSerializer.readValue(data, accessor, offset);
+            srcs[i * 2 + 0] = keys.asComparableBytes(accessor, k, version);
+            T v = CollectionSerializer.readValue(data, accessor, offset);
             offset += CollectionSerializer.sizeOfValue(v, accessor);
-            srcs[i * 2 + 1] = valuesComparator.asComparableBytes(accessor, v, 
version);
+            srcs[i * 2 + 1] = values.asComparableBytes(accessor, v, version);

Review Comment:
   good decluttering :-) 



##########
src/java/org/apache/cassandra/db/marshal/CollectionType.java:
##########
@@ -162,17 +169,13 @@ public boolean isFreezable()
     }
 
     // Overrided by maps

Review Comment:
   This comment can be also removed?



##########
src/java/org/apache/cassandra/db/marshal/MultiElementType.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Base type for the types being composed of multi-elements like Collections, 
Tuples, UDTs or Vectors.
+ * This class unifies the methods used by the CQL layer to work with those 
types.
+ */
+public abstract class MultiElementType<T> extends AbstractType<T>
+{
+    protected MultiElementType(ComparisonType comparisonType)
+    {
+        super(comparisonType);
+    }
+
+    /**
+     * Returns the serialized representation of the value composed of the 
specified elements.
+     *
+     * @param elements the serialized values of the elements
+     * @return the serialized representation of the value composed of the 
specified elements.
+     */
+    public abstract ByteBuffer pack(List<ByteBuffer> elements);
+
+    /**
+     * Returns the serialized representation of the elements composing the 
specified value.
+     *
+     * @param value a serialized value of this type
+     * @return the serialized representation of the elements composing the 
specified value.
+     */
+    public abstract List<ByteBuffer> unpack(ByteBuffer value);

Review Comment:
   As a reader, when I see `pack` and `unpack,` I think they mean two methods 
that do opposite things. (while pack also has a meaning as a noun, unpack is 
only a verb.) But they are both return serialized values, just used in 
different places; it is about the whole and not the complete package (probably 
not using the right words here, but I hope you understand what I mean). How 
about `packed` and `unpacked`?  Not insisting on that change, but giving a 
point of view. It might make it more clear for first-time readers without a 
need to go even to the javadoc. I leave it up to you to decide. 



##########
src/java/org/apache/cassandra/cql3/CQL3Type.java:
##########
@@ -82,7 +82,7 @@ default ByteBuffer fromCQLLiteral(String literal)
      */
     default ByteBuffer fromCQLLiteral(String keyspaceName, String literal)
     {
-        return Terms.asBytes(keyspaceName, literal, getType());
+        return Term.asBytes(keyspaceName, literal, getType());

Review Comment:
   While we are here, shall we clear the modifiers, too? I don't insist but it 
could be nice



##########
src/java/org/apache/cassandra/serializers/MapSerializer.java:
##########
@@ -78,10 +79,15 @@ public List<ByteBuffer> serializeValues(Map<K, V> map)
         return buffers;
     }
 
+    public <E> int collectionSize(Collection<E> elements)
+    {
+        return elements.size() >> 1;
+    }
+
     @Override
-    public int getElementCount(Map<K, V> value)
+    protected int numberOfSerializedElements(int collectionSize)
     {
-        return value.size();
+        return collectionSize * 2; // keys and values

Review Comment:
   NM, we flatten to list... my mistake, makes sense now



##########
src/java/org/apache/cassandra/cql3/Maps.java:
##########
@@ -249,138 +235,6 @@ public String getText()
         }
     }
 
-    public static class Value extends Term.Terminal
-    {
-        public final SortedMap<ByteBuffer, ByteBuffer> map;
-
-        public Value(SortedMap<ByteBuffer, ByteBuffer> map)
-        {
-            this.map = map;
-        }
-
-        public static <K, V> Value fromSerialized(ByteBuffer value, MapType<K, 
V> type) throws InvalidRequestException
-        {
-            try
-            {
-                // Collections have this small hack that validate cannot be 
called on a serialized object,
-                // but compose does the validation (so we're fine).
-                Map<K, V> m = type.getSerializer().deserialize(value, 
ByteBufferAccessor.instance);
-                // We depend on Maps to be properly sorted by their keys, so 
use a sorted map implementation here.
-                SortedMap<ByteBuffer, ByteBuffer> map = new 
TreeMap<>(type.getKeysType());
-                for (Map.Entry<K, V> entry : m.entrySet())
-                    map.put(type.getKeysType().decompose(entry.getKey()), 
type.getValuesType().decompose(entry.getValue()));
-                return new Value(map);
-            }
-            catch (MarshalException e)
-            {
-                throw new InvalidRequestException(e.getMessage());
-            }
-        }
-
-        @Override
-        public ByteBuffer get(ProtocolVersion version)
-        {
-            List<ByteBuffer> buffers = new ArrayList<>(2 * map.size());
-            for (Map.Entry<ByteBuffer, ByteBuffer> entry : map.entrySet())
-            {
-                buffers.add(entry.getKey());
-                buffers.add(entry.getValue());
-            }
-            return CollectionSerializer.pack(buffers, map.size());
-        }
-
-        public boolean equals(MapType<?, ?> mt, Value v)

Review Comment:
   confirming my understanding, so switching to lists, we don't need this 
equals anymore? 



##########
src/java/org/apache/cassandra/db/marshal/MultiElementType.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Base type for the types being composed of multi-elements like Collections, 
Tuples, UDTs or Vectors.
+ * This class unifies the methods used by the CQL layer to work with those 
types.
+ */
+public abstract class MultiElementType<T> extends AbstractType<T>
+{
+    protected MultiElementType(ComparisonType comparisonType)
+    {
+        super(comparisonType);
+    }
+
+    /**
+     * Returns the serialized representation of the value composed of the 
specified elements.
+     *
+     * @param elements the serialized values of the elements
+     * @return the serialized representation of the value composed of the 
specified elements.
+     */
+    public abstract ByteBuffer pack(List<ByteBuffer> elements);
+
+    /**
+     * Returns the serialized representation of the elements composing the 
specified value.
+     *
+     * @param value a serialized value of this type
+     * @return the serialized representation of the elements composing the 
specified value.
+     */
+    public abstract List<ByteBuffer> unpack(ByteBuffer value);
+
+    /**
+     * Checks if this type support bind makers for its elements when the type 
value is provided through a litteral.
+     * @return {@code true} if this type support bind makers for its elements, 
{@code false} otherwise.

Review Comment:
   ```suggestion
        * @return {@code true} if this type supports bind makers for its 
elements, {@code false} otherwise.
   ```



##########
src/java/org/apache/cassandra/db/marshal/TupleType.java:
##########
@@ -276,62 +279,69 @@ public <V> V fromComparableBytes(ValueAccessor<V> 
accessor, ByteSource.Peekable
         assert terminator == ByteSource.TERMINATOR : String.format("Expected 
TERMINATOR (0x%2x) after %d components",
                                                                    
ByteSource.TERMINATOR,
                                                                    
types.size());
-        return buildValue(accessor, componentBuffers);
+        return pack(accessor, Arrays.asList(componentBuffers));
     }
 
-    /**
-     * Split a tuple value into its component values.
-     */
-    public <V> V[] split(ValueAccessor<V> accessor, V value)
+    @Override
+    public List<ByteBuffer> unpack(ByteBuffer value)
     {
-        return split(accessor, value, size(), this);
+        return unpack(value, ByteBufferAccessor.instance);
     }
 
-    /**
-     * Split a tuple value into its component values.
-     */
-    public static <V> V[] split(ValueAccessor<V> accessor, V value, int 
numberOfElements, TupleType type)
+    public <V> List<V> unpack(V value, ValueAccessor<V> accessor)
     {
-        V[] components = accessor.createArray(numberOfElements);
+        int numberOfElements = size();
+        List<V> components = new ArrayList<>(numberOfElements);
         int length = accessor.size(value);
         int position = 0;
         for (int i = 0; i < numberOfElements; i++)
         {
             if (position == length)
-                return Arrays.copyOfRange(components, 0, i);
+            {
+                return components;
+            }
 
             if (position + 4 > length)
-                throw new MarshalException(String.format("Not enough bytes to 
read %dth component", i));
+                throw new MarshalException(String.format("Not enough bytes to 
read %dth %s", i, componentOrFieldName(i)));
 
             int size = accessor.getInt(value, position);
             position += 4;
 
             // size < 0 means null value
             if (size >= 0)
             {
-                if (position + size > length)
-                    throw new MarshalException(String.format("Not enough bytes 
to read %dth component", i));
+                if (length - position < size)
+                    throw new MarshalException(String.format("Not enough bytes 
to read %dth %s", i, componentOrFieldName(i)));
 
-                components[i] = accessor.slice(value, position, size);
+                components.add(accessor.slice(value, position, size));
                 position += size;
             }
             else
-                components[i] = null;
+                components.add(null);
         }
 
         // error out if we got more values in the tuple/UDT than we expected
         if (position < length)
         {
-            throw new MarshalException(String.format("Expected %s %s for %s 
column, but got more",
-                                                     numberOfElements, 
numberOfElements == 1 ? "value" : "values",
-                                                     type.asCQL3Type()));
+            // Invalid remaining data after end of tuple value
+            // Invalid remaining data after end of UDT value

Review Comment:
   It just repeats the error message itself. There is also a comment already on 
line 323.



##########
src/java/org/apache/cassandra/cql3/selection/TupleSelector.java:
##########
@@ -97,12 +97,12 @@ public void addInput(InputRow input)
 
     public ByteBuffer getOutput(ProtocolVersion protocolVersion) throws 
InvalidRequestException
     {
-        ByteBuffer[] buffers = new ByteBuffer[elements.size()];
+        List<ByteBuffer> buffers = new ArrayList<>(elements.size());
         for (int i = 0, m = elements.size(); i < m; i++)
         {
-            buffers[i] = elements.get(i).getOutput(protocolVersion);
+            buffers.add(elements.get(i).getOutput(protocolVersion));
         }
-        return TupleType.buildValue(buffers);
+        return ((TupleType) type).pack(buffers);

Review Comment:
   Is the casting really needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to