http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 7be635f..a17ee92 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -120,13 +120,12 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
                     throw new InvalidRequestException(String.format("Cannot 
re-add previously dropped counter column %s", columnName));
 
                 AbstractType<?> type = validator.getType();
-                if (type instanceof CollectionType)
+                if (type.isCollection() && type.isMultiCell())
                 {
                     if (!cfm.comparator.supportCollections())
-                        throw new InvalidRequestException("Cannot use 
collection types with non-composite PRIMARY KEY");
+                        throw new InvalidRequestException("Cannot use 
non-frozen collections with a non-composite PRIMARY KEY");
                     if (cfm.isSuper())
-                        throw new InvalidRequestException("Cannot use 
collection types with Super column family");
-
+                        throw new InvalidRequestException("Cannot use 
non-frozen collections with super column families");
 
                     // If there used to be a collection column with the same 
name (that has been dropped), it will
                     // still be appear in the ColumnToCollectionType because 
or reasons explained on #6276. The same
@@ -151,35 +150,35 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
             case ALTER:
                 assert columnName != null;
                 if (def == null)
-                    throw new InvalidRequestException(String.format("Cell %s 
was not found in table %s", columnName, columnFamily()));
+                    throw new InvalidRequestException(String.format("Column %s 
was not found in table %s", columnName, columnFamily()));
 
+                AbstractType<?> validatorType = validator.getType();
                 switch (def.kind)
                 {
                     case PARTITION_KEY:
-                        AbstractType<?> newType = validator.getType();
-                        if (newType instanceof CounterColumnType)
+                        if (validatorType instanceof CounterColumnType)
                             throw new 
InvalidRequestException(String.format("counter type is not supported for 
PRIMARY KEY part %s", columnName));
                         if (cfm.getKeyValidator() instanceof CompositeType)
                         {
                             List<AbstractType<?>> oldTypes = ((CompositeType) 
cfm.getKeyValidator()).types;
-                            if 
(!newType.isValueCompatibleWith(oldTypes.get(def.position())))
+                            if 
(!validatorType.isValueCompatibleWith(oldTypes.get(def.position())))
                                 throw new 
ConfigurationException(String.format("Cannot change %s from type %s to type %s: 
types are incompatible.",
                                                                                
columnName,
                                                                                
oldTypes.get(def.position()).asCQL3Type(),
                                                                                
validator));
 
                             List<AbstractType<?>> newTypes = new 
ArrayList<AbstractType<?>>(oldTypes);
-                            newTypes.set(def.position(), newType);
+                            newTypes.set(def.position(), validatorType);
                             
cfm.keyValidator(CompositeType.getInstance(newTypes));
                         }
                         else
                         {
-                            if 
(!newType.isValueCompatibleWith(cfm.getKeyValidator()))
+                            if 
(!validatorType.isValueCompatibleWith(cfm.getKeyValidator()))
                                 throw new 
ConfigurationException(String.format("Cannot change %s from type %s to type %s: 
types are incompatible.",
                                                                                
columnName,
                                                                                
cfm.getKeyValidator().asCQL3Type(),
                                                                                
validator));
-                            cfm.keyValidator(newType);
+                            cfm.keyValidator(validatorType);
                         }
                         break;
                     case CLUSTERING_COLUMN:
@@ -187,22 +186,22 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
                         // Note that CFMetaData.validateCompatibility already 
validate the change we're about to do. However, the error message it
                         // sends is a bit cryptic for a CQL3 user, so 
validating here for a sake of returning a better error message
                         // Do note that we need isCompatibleWith here, not 
just isValueCompatibleWith.
-                        if (!validator.getType().isCompatibleWith(oldType))
+                        if (!validatorType.isCompatibleWith(oldType))
                             throw new 
ConfigurationException(String.format("Cannot change %s from type %s to type %s: 
types are not order-compatible.",
                                                                            
columnName,
                                                                            
oldType.asCQL3Type(),
                                                                            
validator));
 
-                        cfm.comparator = 
cfm.comparator.setSubtype(def.position(), validator.getType());
+                        cfm.comparator = 
cfm.comparator.setSubtype(def.position(), validatorType);
                         break;
                     case COMPACT_VALUE:
                         // See below
-                        if 
(!validator.getType().isValueCompatibleWith(cfm.getDefaultValidator()))
+                        if 
(!validatorType.isValueCompatibleWith(cfm.getDefaultValidator()))
                             throw new 
ConfigurationException(String.format("Cannot change %s from type %s to type %s: 
types are incompatible.",
                                                                            
columnName,
                                                                            
cfm.getDefaultValidator().asCQL3Type(),
                                                                            
validator));
-                        cfm.defaultValidator(validator.getType());
+                        cfm.defaultValidator(validatorType);
                         break;
                     case REGULAR:
                     case STATIC:
@@ -211,23 +210,23 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
                         // allow it for CQL3 (see #5882) so validating it 
explicitly here. We only care about value compatibility
                         // though since we won't compare values (except when 
there is an index, but that is validated by
                         // ColumnDefinition already).
-                        if 
(!validator.getType().isValueCompatibleWith(def.type))
+                        if (!validatorType.isValueCompatibleWith(def.type))
                             throw new 
ConfigurationException(String.format("Cannot change %s from type %s to type %s: 
types are incompatible.",
                                                                            
columnName,
                                                                            
def.type.asCQL3Type(),
                                                                            
validator));
 
                         // For collections, if we alter the type, we need to 
update the comparator too since it includes
-                        // the type too (note that isValueCompatibleWith above 
has validated that the need type don't really
+                        // the type too (note that isValueCompatibleWith above 
has validated that the new type doesn't
                         // change the underlying sorting order, but we still 
don't want to have a discrepancy between the type
                         // in the comparator and the one in the 
ColumnDefinition as that would be dodgy).
-                        if (validator.getType() instanceof CollectionType)
-                            cfm.comparator = 
cfm.comparator.addOrUpdateCollection(def.name, 
(CollectionType)validator.getType());
+                        if (validatorType.isCollection() && 
validatorType.isMultiCell())
+                            cfm.comparator = 
cfm.comparator.addOrUpdateCollection(def.name, (CollectionType)validatorType);
 
                         break;
                 }
                 // In any case, we update the column definition
-                
cfm.addOrReplaceColumnDefinition(def.withNewType(validator.getType()));
+                
cfm.addOrReplaceColumnDefinition(def.withNewType(validatorType));
                 break;
 
             case DROP:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
index cfdd65f..576011f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
@@ -214,23 +214,27 @@ public abstract class AlterTypeStatement extends 
SchemaAlteringStatement
         {
             if (type instanceof ListType)
             {
-                AbstractType<?> t = updateWith(((ListType)type).elements, 
keyspace, toReplace, updated);
-                return t == null ? null : ListType.getInstance(t);
+                AbstractType<?> t = 
updateWith(((ListType)type).getElementsType(), keyspace, toReplace, updated);
+                if (t == null)
+                    return null;
+                return ListType.getInstance(t, type.isMultiCell());
             }
             else if (type instanceof SetType)
             {
-                AbstractType<?> t = updateWith(((SetType)type).elements, 
keyspace, toReplace, updated);
-                return t == null ? null : SetType.getInstance(t);
+                AbstractType<?> t = 
updateWith(((SetType)type).getElementsType(), keyspace, toReplace, updated);
+                if (t == null)
+                    return null;
+                return SetType.getInstance(t, type.isMultiCell());
             }
             else
             {
                 assert type instanceof MapType;
                 MapType mt = (MapType)type;
-                AbstractType<?> k = updateWith(mt.keys, keyspace, toReplace, 
updated);
-                AbstractType<?> v = updateWith(mt.values, keyspace, toReplace, 
updated);
+                AbstractType<?> k = updateWith(mt.getKeysType(), keyspace, 
toReplace, updated);
+                AbstractType<?> v = updateWith(mt.getValuesType(), keyspace, 
toReplace, updated);
                 if (k == null && v == null)
                     return null;
-                return MapType.getInstance(k == null ? mt.keys : k, v == null 
? mt.values : v);
+                return MapType.getInstance(k == null ? mt.getKeysType() : k, v 
== null ? mt.getValuesType() : v, type.isMultiCell());
             }
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index c93adc2..3032897 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -79,8 +79,24 @@ public class CreateIndexStatement extends 
SchemaAlteringStatement
             throw new InvalidRequestException("No column definition found for 
column " + target.column);
 
         boolean isMap = cd.type instanceof MapType;
-        if (target.isCollectionKeys && !isMap)
-            throw new InvalidRequestException("Cannot create index on keys of 
column " + target + " with non map type");
+        boolean isFrozenCollection = cd.type.isCollection() && 
!cd.type.isMultiCell();
+        if (target.isCollectionKeys)
+        {
+            if (!isMap)
+                throw new InvalidRequestException("Cannot create index on keys 
of column " + target + " with non-map type");
+            if (!cd.type.isMultiCell())
+                throw new InvalidRequestException("Cannot create index on keys 
of frozen<map> column " + target);
+        }
+        else if (target.isFullCollection)
+        {
+            if (!isFrozenCollection)
+                throw new InvalidRequestException("full() indexes can only be 
created on frozen collections");
+        }
+        else if (isFrozenCollection)
+        {
+            throw new InvalidRequestException("Frozen collections currently 
only support full-collection indexes. " +
+                                              "For example, 'CREATE INDEX ON 
<table>(full(<columnName>))'.");
+        }
 
         if (cd.getIndexType() != null)
         {
@@ -116,7 +132,7 @@ public class CreateIndexStatement extends 
SchemaAlteringStatement
             throw new InvalidRequestException("Secondary indexes are not 
allowed on static columns");
 
         if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && 
cd.isOnAllComponents())
-            throw new InvalidRequestException(String.format("Cannot add 
secondary index to already primarily indexed column %s", target.column));
+            throw new InvalidRequestException(String.format("Cannot create 
secondary index on partition key column %s", target.column));
     }
 
     public boolean announceMigration(boolean isLocalOnly) throws 
RequestValidationException
@@ -139,7 +155,7 @@ public class CreateIndexStatement extends 
SchemaAlteringStatement
             // For now, we only allow indexing values for collections, but we 
could later allow
             // to also index map keys, so we record that this is the values we 
index to make our
             // lives easier then.
-            if (cd.type.isCollection())
+            if (cd.type.isCollection() && cd.type.isMultiCell())
                 options = ImmutableMap.of(target.isCollectionKeys ? 
SecondaryIndex.INDEX_KEYS_OPTION_NAME
                                                                   : 
SecondaryIndex.INDEX_VALUES_OPTION_NAME, "");
             cd.setIndexType(IndexType.COMPOSITES, options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 672bb50..5f70ab8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -199,16 +199,16 @@ public class CreateTableStatement extends 
SchemaAlteringStatement
 
             CreateTableStatement stmt = new CreateTableStatement(cfName, 
properties, ifNotExists, staticColumns);
 
-            Map<ByteBuffer, CollectionType> definedCollections = null;
+            Map<ByteBuffer, CollectionType> definedMultiCellCollections = null;
             for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : 
definitions.entrySet())
             {
                 ColumnIdentifier id = entry.getKey();
                 CQL3Type pt = entry.getValue().prepare(keyspace());
-                if (pt.isCollection())
+                if (pt.isCollection() && ((CollectionType) 
pt.getType()).isMultiCell())
                 {
-                    if (definedCollections == null)
-                        definedCollections = new HashMap<ByteBuffer, 
CollectionType>();
-                    definedCollections.put(id.bytes, 
(CollectionType)pt.getType());
+                    if (definedMultiCellCollections == null)
+                        definedMultiCellCollections = new HashMap<>();
+                    definedMultiCellCollections.put(id.bytes, (CollectionType) 
pt.getType());
                 }
                 stmt.columns.put(id, pt.getType()); // we'll remove what is 
not a column below
             }
@@ -246,16 +246,16 @@ public class CreateTableStatement extends 
SchemaAlteringStatement
                     if (stmt.columns.isEmpty())
                         throw new InvalidRequestException("No definition found 
that is not part of the PRIMARY KEY");
 
-                    if (definedCollections != null)
-                        throw new InvalidRequestException("Collection types 
are not supported with COMPACT STORAGE");
+                    if (definedMultiCellCollections != null)
+                        throw new InvalidRequestException("Non-frozen 
collection types are not supported with COMPACT STORAGE");
 
                     stmt.comparator = new 
SimpleSparseCellNameType(UTF8Type.instance);
                 }
                 else
                 {
-                    stmt.comparator = definedCollections == null
+                    stmt.comparator = definedMultiCellCollections == null
                                     ? new 
CompoundSparseCellNameType(Collections.<AbstractType<?>>emptyList())
-                                    : new 
CompoundSparseCellNameType.WithCollection(Collections.<AbstractType<?>>emptyList(),
 ColumnToCollectionType.getInstance(definedCollections));
+                                    : new 
CompoundSparseCellNameType.WithCollection(Collections.<AbstractType<?>>emptyList(),
 ColumnToCollectionType.getInstance(definedMultiCellCollections));
                 }
             }
             else
@@ -264,7 +264,7 @@ public class CreateTableStatement extends 
SchemaAlteringStatement
                 // standard "dynamic" CF, otherwise it's a composite
                 if (useCompactStorage && columnAliases.size() == 1)
                 {
-                    if (definedCollections != null)
+                    if (definedMultiCellCollections != null)
                         throw new InvalidRequestException("Collection types 
are not supported with COMPACT STORAGE");
 
                     ColumnIdentifier alias = columnAliases.get(0);
@@ -294,16 +294,16 @@ public class CreateTableStatement extends 
SchemaAlteringStatement
 
                     if (useCompactStorage)
                     {
-                        if (definedCollections != null)
+                        if (definedMultiCellCollections != null)
                             throw new InvalidRequestException("Collection 
types are not supported with COMPACT STORAGE");
 
                         stmt.comparator = new CompoundDenseCellNameType(types);
                     }
                     else
                     {
-                        stmt.comparator = definedCollections == null
+                        stmt.comparator = definedMultiCellCollections == null
                                         ? new CompoundSparseCellNameType(types)
-                                        : new 
CompoundSparseCellNameType.WithCollection(types, 
ColumnToCollectionType.getInstance(definedCollections));
+                                        : new 
CompoundSparseCellNameType.WithCollection(types, 
ColumnToCollectionType.getInstance(definedMultiCellCollections));
                     }
                 }
             }
@@ -385,7 +385,7 @@ public class CreateTableStatement extends 
SchemaAlteringStatement
             AbstractType type = columns.get(t);
             if (type == null)
                 throw new InvalidRequestException(String.format("Unknown 
definition %s referenced in PRIMARY KEY", t));
-            if (type instanceof CollectionType)
+            if (type.isCollection() && type.isMultiCell())
                 throw new InvalidRequestException(String.format("Invalid 
collection type for PRIMARY KEY component %s", t));
 
             columns.remove(t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index b49f60b..33c61e7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -56,7 +56,7 @@ public class DeleteStatement extends ModificationStatement
             // However, if we delete only static colums, it's fine since we 
won't really use the prefix anyway.
             for (Operation deletion : deletions)
                 if (!deletion.column.isStatic())
-                    throw new InvalidRequestException(String.format("Missing 
mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), 
deletion.column.name));
+                    throw new InvalidRequestException(String.format("Primary 
key column '%s' must be specified in order to delete column '%s'", 
getFirstEmptyKey().name, deletion.column.name));
         }
 
         if (deletions.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index 8bcaaf6..a3b82a4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -117,11 +117,11 @@ public class DropTypeStatement extends 
SchemaAlteringStatement
         else if (toCheck instanceof CollectionType)
         {
             if (toCheck instanceof ListType)
-                return isUsedBy(((ListType)toCheck).elements);
+                return isUsedBy(((ListType)toCheck).getElementsType());
             else if (toCheck instanceof SetType)
-                return isUsedBy(((SetType)toCheck).elements);
+                return isUsedBy(((SetType)toCheck).getElementsType());
             else
-                return isUsedBy(((MapType)toCheck).keys) || 
isUsedBy(((MapType)toCheck).keys);
+                return isUsedBy(((MapType)toCheck).getKeysType()) || 
isUsedBy(((MapType)toCheck).getKeysType());
         }
         return false;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java 
b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
index dc77bcc..eeee907 100644
--- a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
+++ b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
@@ -24,37 +24,46 @@ public class IndexTarget
 {
     public final ColumnIdentifier column;
     public final boolean isCollectionKeys;
+    public final boolean isFullCollection;
 
-    private IndexTarget(ColumnIdentifier column, boolean isCollectionKeys)
+    private IndexTarget(ColumnIdentifier column, boolean isCollectionKeys, 
boolean isFullCollection)
     {
         this.column = column;
         this.isCollectionKeys = isCollectionKeys;
+        this.isFullCollection = isFullCollection;
     }
 
     public static class Raw
     {
         private final ColumnIdentifier.Raw column;
-        public final boolean isCollectionKeys;
+        private final boolean isCollectionKeys;
+        private final boolean isFullCollection;
 
-        private Raw(ColumnIdentifier.Raw column, boolean isCollectionKeys)
+        private Raw(ColumnIdentifier.Raw column, boolean isCollectionKeys, 
boolean isFullCollection)
         {
             this.column = column;
             this.isCollectionKeys = isCollectionKeys;
+            this.isFullCollection = isFullCollection;
         }
 
-        public static Raw of(ColumnIdentifier.Raw c)
+        public static Raw valuesOf(ColumnIdentifier.Raw c)
         {
-            return new Raw(c, false);
+            return new Raw(c, false, false);
         }
 
         public static Raw keysOf(ColumnIdentifier.Raw c)
         {
-            return new Raw(c, true);
+            return new Raw(c, true, false);
+        }
+
+        public static Raw fullCollection(ColumnIdentifier.Raw c)
+        {
+            return new Raw(c, false, true);
         }
 
         public IndexTarget prepare(CFMetaData cfm)
         {
-            return new IndexTarget(column.prepare(cfm), isCollectionKeys);
+            return new IndexTarget(column.prepare(cfm), isCollectionKeys, 
isFullCollection);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java 
b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
index 5307cbb..659ed95 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
@@ -40,6 +40,12 @@ public interface Restriction
     public boolean isContains();
     public boolean isMultiColumn();
 
+    /**
+     * Returns true if, when applied to a clustering column, this restriction 
can be handled through one or more slices
+     * alone without filtering.  For example, EQ restrictions can be 
represented as a slice, but CONTAINS cannot.
+     */
+    public boolean canEvaluateWithSlices();
+
     // Not supported by Slice, but it's convenient to have here
     public List<ByteBuffer> values(QueryOptions options) throws 
InvalidRequestException;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 84cbdc0..09d1e52 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -707,33 +707,52 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
             ColumnDefinition def = idIter.next();
             assert r != null && !r.isSlice();
 
-            List<ByteBuffer> values = r.values(options);
-            if (values.size() == 1)
+            if (r.isEQ())
             {
-                ByteBuffer val = values.get(0);
+                ByteBuffer val = r.values(options).get(0);
                 if (val == null)
                     throw new InvalidRequestException(String.format("Invalid 
null value for clustering key part %s", def.name));
                 builder.add(val);
             }
             else
             {
-                // We have a IN, which we only support for the last column.
-                // If compact, just add all values and we're done. Otherwise,
-                // for each value of the IN, creates all the columns 
corresponding to the selection.
-                if (values.isEmpty())
-                    return null;
-                SortedSet<CellName> columns = new 
TreeSet<CellName>(cfm.comparator);
-                Iterator<ByteBuffer> iter = values.iterator();
-                while (iter.hasNext())
+                if (!r.isMultiColumn())
                 {
-                    ByteBuffer val = iter.next();
-                    if (val == null)
-                        throw new 
InvalidRequestException(String.format("Invalid null value for clustering key 
part %s", def.name));
+                    List<ByteBuffer> values = r.values(options);
+                    // We have a IN, which we only support for the last column.
+                    // If compact, just add all values and we're done. 
Otherwise,
+                    // for each value of the IN, creates all the columns 
corresponding to the selection.
+                    if (values.isEmpty())
+                        return null;
+                    SortedSet<CellName> columns = new 
TreeSet<CellName>(cfm.comparator);
+                    Iterator<ByteBuffer> iter = values.iterator();
+                    while (iter.hasNext())
+                    {
+                        ByteBuffer val = iter.next();
+                        if (val == null)
+                            throw new 
InvalidRequestException(String.format("Invalid null value for clustering key 
part %s", def.name));
+
+                        Composite prefix = builder.buildWith(val);
+                        columns.addAll(addSelectedColumns(prefix));
+                    }
+                    return columns;
+                }
+                else
+                {
+                    // we have a multi-column IN restriction
+                    List<List<ByteBuffer>> values = 
((MultiColumnRestriction.IN) r).splitValues(options);
+                    TreeSet<CellName> inValues = new TreeSet<>(cfm.comparator);
+                    for (List<ByteBuffer> components : values)
+                    {
+                        for (int i = 0; i < components.size(); i++)
+                            if (components.get(i) == null)
+                                throw new InvalidRequestException("Invalid 
null value in condition for column " + cfm.clusteringColumns().get(i + 
def.position()));
 
-                    Composite prefix = builder.buildWith(val);
-                    columns.addAll(addSelectedColumns(prefix));
+                        Composite prefix = builder.buildWith(components);
+                        inValues.addAll(addSelectedColumns(prefix));
+                    }
+                    return inValues;
                 }
-                return columns;
             }
         }
 
@@ -778,6 +797,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
         }
     }
 
+    /** Returns true if a non-frozen collection is selected, false otherwise. 
*/
     private boolean selectACollection()
     {
         if (!cfm.comparator.hasCollections())
@@ -785,7 +805,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
 
         for (ColumnDefinition def : selection.getColumns())
         {
-            if (def.type instanceof CollectionType)
+            if (def.type.isCollection() && def.type.isMultiCell())
                 return true;
         }
 
@@ -830,7 +850,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
             // But if the actual comparator itself is reversed, we must 
inversed the bounds too.
             Bound b = isReversed == isReversedType(def) ? bound : 
Bound.reverse(bound);
             Restriction r = restrictions[def.position()];
-            if (isNullRestriction(r, b))
+            if (isNullRestriction(r, b) || !r.canEvaluateWithSlices())
             {
                 // There wasn't any non EQ relation on that key, we select all 
records having the preceding component as prefix.
                 // For composites, if there was preceding component and we're 
computing the end, we must change the last component
@@ -846,6 +866,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
             }
             else
             {
+                // IN or EQ
                 List<ByteBuffer> values = r.values(options);
                 if (values.size() != 1)
                 {
@@ -1088,7 +1109,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
                 expressions.add(new IndexExpression(def.name.bytes, 
Operator.EQ, value));
             }
         }
-        
+
         if (usesSecondaryIndexing)
         {
             ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
@@ -1258,13 +1279,13 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
             return;
         }
 
-        if (def.type.isCollection())
+        if (def.type.isMultiCell())
         {
-            List<Cell> collection = row.getCollection(def.name);
-            ByteBuffer value = collection == null
+            List<Cell> cells = row.getMultiCellColumn(def.name);
+            ByteBuffer buffer = cells == null
                              ? null
-                             : 
((CollectionType)def.type).serializeForNativeProtocol(collection, 
options.getProtocolVersion());
-            result.add(value);
+                             : 
((CollectionType)def.type).serializeForNativeProtocol(cells, 
options.getProtocolVersion());
+            result.add(buffer);
             return;
         }
 
@@ -1470,16 +1491,26 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
                 stmt.usesSecondaryIndexing = true;
 
             if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedColumns.removeAll(cfm.clusteringColumns());
+            {
+                for (ColumnDefinition def : cfm.clusteringColumns())
+                {
+                    // Remove clustering column restrictions that can be 
handled by slices; the remainder will be
+                    // handled by filters (which may require a secondary 
index).
+                    Restriction restriction = 
stmt.columnRestrictions[def.position()];
+                    if (restriction != null)
+                    {
+                        if (restriction.canEvaluateWithSlices())
+                            stmt.restrictedColumns.remove(def);
+                        else
+                            stmt.usesSecondaryIndexing = true;
+                    }
+                }
+            }
 
             // Even if usesSecondaryIndexing is false at this point, we'll 
still have to use one if
-            // there is restrictions not covered by the PK.
+            // there are restrictions not covered by the PK.
             if (!stmt.metadataRestrictions.isEmpty())
-            {
-                if (!hasQueriableIndex)
-                    throw new InvalidRequestException("No indexed columns 
present in by-columns clause with Equal operator");
                 stmt.usesSecondaryIndexing = true;
-            }
 
             if (stmt.usesSecondaryIndexing)
                 validateSecondaryIndexSelections(stmt);
@@ -1510,6 +1541,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
             SecondaryIndex index = 
indexManager.getIndexForColumn(def.name.bytes);
             if (index != null && index.supportsOperator(relation.operator()))
                 return new boolean[]{true, def.kind == 
ColumnDefinition.Kind.CLUSTERING_COLUMN};
+
             return new boolean[]{false, false};
         }
 
@@ -1692,8 +1724,8 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
                                                    
StorageService.getPartitioner().getTokenValidator());
             }
 
-            // We don't support relations against entire collections, like 
"numbers = {1, 2, 3}"
-            if (receiver.type.isCollection() && 
!(newRel.operator().equals(Operator.CONTAINS_KEY) || newRel.operator() == 
Operator.CONTAINS))
+            // We don't support relations against entire collections (unless 
they're frozen), like "numbers = {1, 2, 3}"
+            if (receiver.type.isCollection() && receiver.type.isMultiCell() && 
!(newRel.operator() == Operator.CONTAINS_KEY || newRel.operator() == 
Operator.CONTAINS))
             {
                 throw new InvalidRequestException(String.format("Collection 
column '%s' (%s) cannot be restricted by a '%s' relation",
                                                                 def.name, 
receiver.type.asCQL3Type(), newRel.operator()));
@@ -1760,7 +1792,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
                     break;
                 case CONTAINS_KEY:
                     if (!(receiver.type instanceof MapType))
-                        throw new 
InvalidRequestException(String.format("Cannot use CONTAINS_KEY on non-map 
column %s", def.name));
+                        throw new 
InvalidRequestException(String.format("Cannot use CONTAINS KEY on non-map 
column %s", def.name));
                     // Fallthrough on purpose
                 case CONTAINS:
                 {
@@ -1771,6 +1803,7 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
                         existingRestriction = new 
SingleColumnRestriction.Contains();
                     else if (!existingRestriction.isContains())
                         throw new 
InvalidRequestException(String.format("Collection column %s can only be 
restricted by CONTAINS or CONTAINS KEY", def.name));
+
                     boolean isKey = newRel.operator() == Operator.CONTAINS_KEY;
                     receiver = makeCollectionReceiver(receiver, isKey);
                     Term t = newRel.getValue().prepare(keyspace(), receiver);
@@ -1937,6 +1970,12 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
                     else if (stmt.selectACollection())
                         throw new 
InvalidRequestException(String.format("Cannot restrict column \"%s\" by IN 
relation as a collection is selected by the query", cdef.name));
                 }
+                /*
+                else if (restriction.isContains() && !hasQueriableIndex)
+                {
+                    throw new InvalidRequestException(String.format("Cannot 
restrict column \"%s\" by a CONTAINS relation without a secondary index", 
cdef.name));
+                }
+                */
 
                 previous = cdef;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java 
b/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java
index 7f8156e..b1c6ccc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SingleColumnRestriction.java
@@ -73,6 +73,11 @@ public abstract class SingleColumnRestriction implements 
Restriction
             return onToken;
         }
 
+        public boolean canEvaluateWithSlices()
+        {
+            return true;
+        }
+
         @Override
         public String toString()
         {
@@ -127,6 +132,11 @@ public abstract class SingleColumnRestriction implements 
Restriction
             return false;
         }
 
+        public boolean canEvaluateWithSlices()
+        {
+            return true;
+        }
+
         @Override
         public String toString()
         {
@@ -181,6 +191,11 @@ public abstract class SingleColumnRestriction implements 
Restriction
             return false;
         }
 
+        public boolean canEvaluateWithSlices()
+        {
+            return true;
+        }
+
         @Override
         public String toString()
         {
@@ -231,6 +246,11 @@ public abstract class SingleColumnRestriction implements 
Restriction
             return onToken;
         }
 
+        public boolean canEvaluateWithSlices()
+        {
+            return true;
+        }
+
         /** Returns true if the start or end bound (depending on the argument) 
is set, false otherwise */
         public boolean hasBound(Bound b)
         {
@@ -412,6 +432,10 @@ public abstract class SingleColumnRestriction implements 
Restriction
             return false;
         }
 
+        public boolean canEvaluateWithSlices()
+        {
+            return false;
+        }
 
         @Override
         public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java 
b/src/java/org/apache/cassandra/db/CFRowAdder.java
index dfe49ee..3ff9171 100644
--- a/src/java/org/apache/cassandra/db/CFRowAdder.java
+++ b/src/java/org/apache/cassandra/db/CFRowAdder.java
@@ -64,7 +64,7 @@ public class CFRowAdder
     public CFRowAdder resetCollection(String cql3ColumnName)
     {
         ColumnDefinition def = getDefinition(cql3ColumnName);
-        assert def.type.isCollection();
+        assert def.type.isCollection() && def.type.isMultiCell();
         Composite name = cf.getComparator().create(prefix, def);
         cf.addAtom(new RangeTombstone(name.start(), name.end(), timestamp - 1, 
ldt));
         return this;
@@ -75,7 +75,7 @@ public class CFRowAdder
         ColumnDefinition def = getDefinition(cql3ColumnName);
         assert def.type instanceof MapType;
         MapType mt = (MapType)def.type;
-        CellName name = cf.getComparator().create(prefix, def, 
mt.keys.decompose(key));
+        CellName name = cf.getComparator().create(prefix, def, 
mt.getKeysType().decompose(key));
         return add(name, def, value);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java 
b/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
index e33cc63..c62f890 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java
@@ -304,7 +304,7 @@ public abstract class AbstractCellNameType extends 
AbstractCType implements Cell
                         return cell;
                     }
 
-                    public List<Cell> getCollection(ColumnIdentifier name)
+                    public List<Cell> getMultiCellColumn(ColumnIdentifier name)
                     {
                         return null;
                     }
@@ -446,7 +446,7 @@ public abstract class AbstractCellNameType extends 
AbstractCType implements Cell
             return columns == null ? null : columns.get(name);
         }
 
-        public List<Cell> getCollection(ColumnIdentifier name)
+        public List<Cell> getMultiCellColumn(ColumnIdentifier name)
         {
             return collections == null ? null : collections.get(name);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/composites/CellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/CellNameType.java 
b/src/java/org/apache/cassandra/db/composites/CellNameType.java
index 4f45d41..1e87296 100644
--- a/src/java/org/apache/cassandra/db/composites/CellNameType.java
+++ b/src/java/org/apache/cassandra/db/composites/CellNameType.java
@@ -95,7 +95,7 @@ public interface CellNameType extends CType
     public boolean supportCollections();
 
     /**
-     * The type of the collections (or null if the type has not collections).
+     * The type of the collections (or null if the type does not have any 
non-frozen collections).
      */
     public ColumnToCollectionType collectionType();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java 
b/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
index 27b3271..c88c6f4 100644
--- 
a/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
+++ 
b/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
@@ -24,10 +24,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.CQL3Row;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CollectionType;
-import org.apache.cassandra.db.marshal.ColumnToCollectionType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java 
b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 2d56bfa..6f2760a 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -40,9 +40,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IndexExpression;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CollectionType;
-import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.*;
 
 /**
  * Extends a column filter (IFilter) to include a number of IndexExpression.
@@ -324,7 +322,7 @@ public abstract class ExtendedFilter
                 }
                 else
                 {
-                    if (def.type.isCollection())
+                    if (def.type.isCollection() && def.type.isMultiCell())
                     {
                         if (!collectionSatisfies(def, data, prefix, 
expression, collectionElement))
                             return false;
@@ -338,16 +336,49 @@ public abstract class ExtendedFilter
                 if (dataValue == null)
                     return false;
 
-                int v = validator.compare(dataValue, expression.value);
-                if (!satisfies(v, expression.operator))
-                    return false;
+                if (expression.operator == Operator.CONTAINS)
+                {
+                    assert def != null && def.type.isCollection() && 
!def.type.isMultiCell();
+                    CollectionType type = (CollectionType)def.type;
+                    switch (type.kind)
+                    {
+                        case LIST:
+                            ListType<?> listType = (ListType)def.type;
+                            if 
(!listType.getSerializer().deserialize(dataValue).contains(listType.getElementsType().getSerializer().deserialize(expression.value)))
+                                return false;
+                            break;
+                        case SET:
+                            SetType<?> setType = (SetType)def.type;
+                            if 
(!setType.getSerializer().deserialize(dataValue).contains(setType.getElementsType().getSerializer().deserialize(expression.value)))
+                                return false;
+                            break;
+                        case MAP:
+                            MapType<?,?> mapType = (MapType)def.type;
+                            if 
(!mapType.getSerializer().deserialize(dataValue).containsValue(mapType.getValuesType().getSerializer().deserialize(expression.value)))
+                                return false;
+                            break;
+                    }
+                }
+                else if (expression.operator == Operator.CONTAINS_KEY)
+                {
+                    assert def != null && def.type.isCollection() && 
!def.type.isMultiCell() && def.type instanceof MapType;
+                    MapType<?,?> mapType = (MapType)def.type;
+                    if (mapType.getSerializer().getSerializedValue(dataValue, 
expression.value, mapType.getKeysType()) == null)
+                        return false;
+                }
+                else
+                {
+                    int v = validator.compare(dataValue, expression.value);
+                    if (!satisfies(v, expression.operator))
+                        return false;
+                }
             }
             return true;
         }
 
         private static boolean collectionSatisfies(ColumnDefinition def, 
ColumnFamily data, Composite prefix, IndexExpression expr, ByteBuffer 
collectionElement)
         {
-            assert def.type.isCollection();
+            assert def.type.isCollection() && def.type.isMultiCell();
             CollectionType type = (CollectionType)def.type;
 
             if (expr.isContains())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 044912f..62b9e4c 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -35,11 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.BufferDecoratedKey;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 7f8e845..fbbec1d 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,6 +34,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Future;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -585,6 +587,7 @@ public class SecondaryIndexManager
         }
 
         // Validate
+        boolean haveSupportedIndexLookup = false;
         for (Map.Entry<String, Set<IndexExpression>> expressions : 
expressionsByIndexType.entrySet())
         {
             Set<ByteBuffer> columns = 
columnsByIndexType.get(expressions.getKey());
@@ -593,8 +596,37 @@ public class SecondaryIndexManager
             for (IndexExpression expression : expressions.getValue())
             {
                 searcher.validate(expression);
+                haveSupportedIndexLookup |= 
secondaryIndex.supportsOperator(expression.operator);
             }
         }
+
+        if (!haveSupportedIndexLookup)
+        {
+            // build the error message
+            int i = 0;
+            StringBuilder sb = new StringBuilder("No secondary indexes on the 
restricted columns support the provided operators: ");
+            for (Map.Entry<String, Set<IndexExpression>> expressions : 
expressionsByIndexType.entrySet())
+            {
+                for (IndexExpression expression : expressions.getValue())
+                {
+                    if (i++ > 0)
+                        sb.append(", ");
+                    sb.append("'");
+                    String columnName;
+                    try
+                    {
+                        columnName = ByteBufferUtil.string(expression.column);
+                    }
+                    catch (CharacterCodingException ex)
+                    {
+                        columnName = "<unprintable>";
+                    }
+                    sb.append(columnName).append(" 
").append(expression.operator).append(" <value>").append("'");
+                }
+            }
+
+            throw new InvalidRequestException(sb.toString());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index b9ccd8e..93e0643 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -18,12 +18,14 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 public abstract class SecondaryIndexSearcher

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java 
b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 410ea83..ec965fd 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -58,7 +58,7 @@ public abstract class CompositesIndex extends 
AbstractSimplePerColumnSecondaryIn
 
     public static CompositesIndex create(ColumnDefinition cfDef)
     {
-        if (cfDef.type.isCollection())
+        if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
         {
             switch (((CollectionType)cfDef.type).kind)
             {
@@ -90,7 +90,7 @@ public abstract class CompositesIndex extends 
AbstractSimplePerColumnSecondaryIn
     // Check SecondaryIndex.getIndexComparator if you want to know why this is 
static
     public static CellNameType getIndexComparator(CFMetaData baseMetadata, 
ColumnDefinition cfDef)
     {
-        if (cfDef.type.isCollection())
+        if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
         {
             switch (((CollectionType)cfDef.type).kind)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
 
b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
index e69e656..a11a0d9 100644
--- 
a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ 
b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -98,7 +98,7 @@ public class CompositesIndexOnCollectionValue extends 
CompositesIndex
     @Override
     public boolean supportsOperator(Operator operator)
     {
-        return operator == Operator.CONTAINS;
+        return operator == Operator.CONTAINS && !(columnDef.type instanceof 
SetType);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java 
b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 4e1f2a3..8dd2ff3 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -223,6 +223,24 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>
         return false;
     }
 
+    public boolean isMultiCell()
+    {
+        return false;
+    }
+
+    public AbstractType<?> freeze()
+    {
+        return this;
+    }
+
+    /**
+     * @param ignoreFreezing if true, the type string will not be wrapped with 
FrozenType(...), even if this type is frozen.
+     */
+    public String toString(boolean ignoreFreezing)
+    {
+        return this.toString();
+    }
+
     /**
      * The number of subcomponents this type has.
      * This is always 1, i.e. the type has only itself as "subcomponents", 
except for CompositeType.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java 
b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index e63f2a5..24ad533 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -20,20 +20,20 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.transport.Server;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.CQL3Type;
-import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
- * The abstract validator that is the base for maps, sets and lists.
+ * The abstract validator that is the base for maps, sets and lists (both 
frozen and non-frozen).
  *
  * Please note that this comparator shouldn't be used "manually" (through 
thrift for instance).
- *
  */
 public abstract class CollectionType<T> extends AbstractType<T>
 {
@@ -56,27 +56,9 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
     public abstract AbstractType<?> nameComparator();
     public abstract AbstractType<?> valueComparator();
 
-    protected abstract void appendToStringBuilder(StringBuilder sb);
-
-    public abstract List<ByteBuffer> serializedValues(List<Cell> cells);
-
     @Override
     public abstract CollectionSerializer<T> getSerializer();
 
-    @Override
-    public void validateCellValue(ByteBuffer cellValue) throws MarshalException
-    {
-        valueComparator().validate(cellValue);
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-        appendToStringBuilder(sb);
-        return sb.toString();
-    }
-
     public String getString(ByteBuffer bytes)
     {
         return BytesType.instance.getString(bytes);
@@ -94,25 +76,18 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         }
     }
 
-    @Override
-    public boolean isCompatibleWith(AbstractType<?> previous)
+    public boolean isCollection()
     {
-        if (this == previous)
-            return true;
-
-        if (!getClass().equals(previous.getClass()))
-            return false;
-
-        CollectionType tprev = (CollectionType) previous;
-        // The name is part of the Cell name, so we need sorting 
compatibility, i.e. isCompatibleWith().
-        // But value is the Cell value, so isValueCompatibleWith() is enough
-        return this.nameComparator().isCompatibleWith(tprev.nameComparator())
-            && 
this.valueComparator().isValueCompatibleWith(tprev.valueComparator());
+        return true;
     }
 
-    public boolean isCollection()
+    @Override
+    public void validateCellValue(ByteBuffer cellValue) throws MarshalException
     {
-        return true;
+        if (isMultiCell())
+            valueComparator().validate(cellValue);
+        else
+            super.validateCellValue(cellValue);
     }
 
     /**
@@ -124,9 +99,11 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         return kind == Kind.MAP;
     }
 
-    protected List<Cell> enforceLimit(List<Cell> cells, int version)
+    public List<Cell> enforceLimit(List<Cell> cells, int version)
     {
-        if (version >= 3 || cells.size() <= MAX_ELEMENTS)
+        assert isMultiCell();
+
+        if (version >= Server.VERSION_3 || cells.size() <= MAX_ELEMENTS)
             return cells;
 
         logger.error("Detected collection with {} elements, more than the {} 
limit. Only the first {} elements will be returned to the client. "
@@ -134,15 +111,75 @@ public abstract class CollectionType<T> extends 
AbstractType<T>
         return cells.subList(0, MAX_ELEMENTS);
     }
 
+    public abstract List<ByteBuffer> serializedValues(List<Cell> cells);
+
     public ByteBuffer serializeForNativeProtocol(List<Cell> cells, int version)
     {
+        assert isMultiCell();
         cells = enforceLimit(cells, version);
         List<ByteBuffer> values = serializedValues(cells);
         return CollectionSerializer.pack(values, cells.size(), version);
     }
 
+    @Override
+    public boolean isCompatibleWith(AbstractType<?> previous)
+    {
+        if (this == previous)
+            return true;
+
+        if (!getClass().equals(previous.getClass()))
+            return false;
+
+        CollectionType tprev = (CollectionType) previous;
+        if (this.isMultiCell() != tprev.isMultiCell())
+            return false;
+
+        // subclasses should handle compatibility checks for frozen collections
+        if (!this.isMultiCell())
+            return isCompatibleWithFrozen(tprev);
+
+        if (!this.nameComparator().isCompatibleWith(tprev.nameComparator()))
+            return false;
+
+        // the value comparator is only used for Cell values, so sorting 
doesn't matter
+        return 
this.valueComparator().isValueCompatibleWith(tprev.valueComparator());
+    }
+
+    @Override
+    public boolean isValueCompatibleWithInternal(AbstractType<?> previous)
+    {
+        // for multi-cell collections, compatibility and value-compatibility 
are the same
+        if (this.isMultiCell())
+            return isCompatibleWith(previous);
+
+        if (this == previous)
+            return true;
+
+        if (!getClass().equals(previous.getClass()))
+            return false;
+
+        CollectionType tprev = (CollectionType) previous;
+        if (this.isMultiCell() != tprev.isMultiCell())
+            return false;
+
+        // subclasses should handle compatibility checks for frozen collections
+        return isValueCompatibleWithFrozen(tprev);
+    }
+
+    /** A version of isCompatibleWith() to deal with non-multicell (frozen) 
collections */
+    protected abstract boolean isCompatibleWithFrozen(CollectionType<?> 
previous);
+
+    /** A version of isValueCompatibleWith() to deal with non-multicell 
(frozen) collections */
+    protected abstract boolean isValueCompatibleWithFrozen(CollectionType<?> 
previous);
+
     public CQL3Type asCQL3Type()
     {
         return new CQL3Type.Collection(this);
     }
+
+    @Override
+    public String toString()
+    {
+        return this.toString(false);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java 
b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
index a28b874..6fb32fb 100644
--- a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 public class ColumnToCollectionType extends AbstractType<ByteBuffer>
 {
     // interning instances
-    private static final Map<Map<ByteBuffer, CollectionType>, 
ColumnToCollectionType> instances = new HashMap<Map<ByteBuffer, 
CollectionType>, ColumnToCollectionType>();
+    private static final Map<Map<ByteBuffer, CollectionType>, 
ColumnToCollectionType> instances = new HashMap<>();
 
     public final Map<ByteBuffer, CollectionType> defined;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/marshal/FrozenType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/FrozenType.java 
b/src/java/org/apache/cassandra/db/marshal/FrozenType.java
new file mode 100644
index 0000000..f440c90
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/FrozenType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.serializers.MarshalException;
+
+/**
+ * A fake type that is only used for parsing type strings that include frozen 
types.
+ */
+public class FrozenType extends AbstractType<Void>
+{
+    public static AbstractType<?> getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
+    {
+        List<AbstractType<?>> innerTypes = parser.getTypeParameters();
+        if (innerTypes.size() != 1)
+            throw new SyntaxException("FrozenType() only accepts one 
parameter");
+
+        AbstractType<?> innerType = innerTypes.get(0);
+        return innerType.freeze();
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public String getString(ByteBuffer bytes)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public ByteBuffer fromString(String source) throws MarshalException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public TypeSerializer<Void> getSerializer()
+    {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java 
b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 171e179..510a526 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -24,17 +24,21 @@ import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.ListSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ListType<T> extends CollectionType<List<T>>
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(ListType.class);
+
     // interning instances
-    private static final Map<AbstractType<?>, ListType> instances = new 
HashMap<AbstractType<?>, ListType>();
+    private static final Map<AbstractType<?>, ListType> instances = new 
HashMap<>();
+    private static final Map<AbstractType<?>, ListType> frozenInstances = new 
HashMap<>();
 
-    public final AbstractType<T> elements;
+    private final AbstractType<T> elements;
     public final ListSerializer<T> serializer;
+    private final boolean isMultiCell;
 
     public static ListType<?> getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
     {
@@ -42,25 +46,32 @@ public class ListType<T> extends CollectionType<List<T>>
         if (l.size() != 1)
             throw new ConfigurationException("ListType takes exactly 1 type 
parameter");
 
-        return getInstance(l.get(0));
+        return getInstance(l.get(0), true);
     }
 
-    public static synchronized <T> ListType<T> getInstance(AbstractType<T> 
elements)
+    public static synchronized <T> ListType<T> getInstance(AbstractType<T> 
elements, boolean isMultiCell)
     {
-        ListType<T> t = instances.get(elements);
+        Map<AbstractType<?>, ListType> internMap = isMultiCell ? instances : 
frozenInstances;
+        ListType<T> t = internMap.get(elements);
         if (t == null)
         {
-            t = new ListType<T>(elements);
-            instances.put(elements, t);
+            t = new ListType<T>(elements, isMultiCell);
+            internMap.put(elements, t);
         }
         return t;
     }
 
-    private ListType(AbstractType<T> elements)
+    private ListType(AbstractType<T> elements, boolean isMultiCell)
     {
         super(Kind.LIST);
         this.elements = elements;
         this.serializer = ListSerializer.getInstance(elements.getSerializer());
+        this.isMultiCell = isMultiCell;
+    }
+
+    public AbstractType<T> getElementsType()
+    {
+        return elements;
     }
 
     public AbstractType<UUID> nameComparator()
@@ -79,6 +90,35 @@ public class ListType<T> extends CollectionType<List<T>>
     }
 
     @Override
+    public AbstractType<?> freeze()
+    {
+        if (isMultiCell)
+            return getInstance(this.elements, false);
+        else
+            return this;
+    }
+
+    @Override
+    public boolean isMultiCell()
+    {
+        return isMultiCell;
+    }
+
+    @Override
+    public boolean isCompatibleWithFrozen(CollectionType<?> previous)
+    {
+        assert !isMultiCell;
+        return this.elements.isCompatibleWith(((ListType) previous).elements);
+    }
+
+    @Override
+    public boolean isValueCompatibleWithFrozen(CollectionType<?> previous)
+    {
+        assert !isMultiCell;
+        return this.elements.isValueCompatibleWithInternal(((ListType) 
previous).elements);
+    }
+
+    @Override
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
         return compareListOrSet(elements, o1, o2);
@@ -86,7 +126,7 @@ public class ListType<T> extends CollectionType<List<T>>
 
     static int compareListOrSet(AbstractType<?> elementsComparator, ByteBuffer 
o1, ByteBuffer o2)
     {
-        // Note that this is only used if the collection is inside an UDT
+        // Note that this is only used if the collection is frozen
         if (!o1.hasRemaining() || !o2.hasRemaining())
             return o1.hasRemaining() ? 1 : o2.hasRemaining() ? -1 : 0;
 
@@ -108,13 +148,24 @@ public class ListType<T> extends CollectionType<List<T>>
         return size1 == size2 ? 0 : (size1 < size2 ? -1 : 1);
     }
 
-    protected void appendToStringBuilder(StringBuilder sb)
+    @Override
+    public String toString(boolean ignoreFreezing)
     {
-        
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
+        boolean includeFrozenType = !ignoreFreezing && !isMultiCell();
+
+        StringBuilder sb = new StringBuilder();
+        if (includeFrozenType)
+            sb.append(FrozenType.class.getName()).append("(");
+        sb.append(getClass().getName());
+        
sb.append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements),
 ignoreFreezing || !isMultiCell));
+        if (includeFrozenType)
+            sb.append(")");
+        return sb.toString();
     }
 
     public List<ByteBuffer> serializedValues(List<Cell> cells)
     {
+        assert isMultiCell;
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
         for (Cell c : cells)
             bbs.add(c.value());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java 
b/src/java/org/apache/cassandra/db/marshal/MapType.java
index dbf6721..64f3e2a 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -24,19 +24,20 @@ import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.MapSerializer;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class MapType<K, V> extends CollectionType<Map<K, V>>
 {
     // interning instances
-    private static final Map<Pair<AbstractType<?>, AbstractType<?>>, MapType> 
instances = new HashMap<Pair<AbstractType<?>, AbstractType<?>>, MapType>();
+    private static final Map<Pair<AbstractType<?>, AbstractType<?>>, MapType> 
instances = new HashMap<>();
+    private static final Map<Pair<AbstractType<?>, AbstractType<?>>, MapType> 
frozenInstances = new HashMap<>();
 
-    public final AbstractType<K> keys;
-    public final AbstractType<V> values;
+    private final AbstractType<K> keys;
+    private final AbstractType<V> values;
     private final MapSerializer<K, V> serializer;
+    private final boolean isMultiCell;
 
     public static MapType<?, ?> getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
     {
@@ -44,27 +45,39 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
         if (l.size() != 2)
             throw new ConfigurationException("MapType takes exactly 2 type 
parameters");
 
-        return getInstance(l.get(0), l.get(1));
+        return getInstance(l.get(0), l.get(1), true);
     }
 
-    public static synchronized <K, V> MapType<K, V> 
getInstance(AbstractType<K> keys, AbstractType<V> values)
+    public static synchronized <K, V> MapType<K, V> 
getInstance(AbstractType<K> keys, AbstractType<V> values, boolean isMultiCell)
     {
+        Map<Pair<AbstractType<?>, AbstractType<?>>, MapType> internMap = 
isMultiCell ? instances : frozenInstances;
         Pair<AbstractType<?>, AbstractType<?>> p = Pair.<AbstractType<?>, 
AbstractType<?>>create(keys, values);
-        MapType<K, V> t = instances.get(p);
+        MapType<K, V> t = internMap.get(p);
         if (t == null)
         {
-            t = new MapType<K, V>(keys, values);
-            instances.put(p, t);
+            t = new MapType<>(keys, values, isMultiCell);
+            internMap.put(p, t);
         }
         return t;
     }
 
-    private MapType(AbstractType<K> keys, AbstractType<V> values)
+    private MapType(AbstractType<K> keys, AbstractType<V> values, boolean 
isMultiCell)
     {
         super(Kind.MAP);
         this.keys = keys;
         this.values = values;
         this.serializer = MapSerializer.getInstance(keys.getSerializer(), 
values.getSerializer());
+        this.isMultiCell = isMultiCell;
+    }
+
+    public AbstractType<K> getKeysType()
+    {
+        return keys;
+    }
+
+    public AbstractType<V> getValuesType()
+    {
+        return values;
     }
 
     public AbstractType<K> nameComparator()
@@ -78,29 +91,65 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
+    public boolean isMultiCell()
+    {
+        return isMultiCell;
+    }
+
+    @Override
+    public AbstractType<?> freeze()
+    {
+        if (isMultiCell)
+            return getInstance(this.keys, this.values, false);
+        else
+            return this;
+    }
+
+    @Override
+    public boolean isCompatibleWithFrozen(CollectionType<?> previous)
+    {
+        assert !isMultiCell;
+        MapType tprev = (MapType) previous;
+        return keys.isCompatibleWith(tprev.keys) && 
values.isCompatibleWith(tprev.values);
+    }
+
+    @Override
+    public boolean isValueCompatibleWithFrozen(CollectionType<?> previous)
+    {
+        assert !isMultiCell;
+        MapType tprev = (MapType) previous;
+        return keys.isCompatibleWith(tprev.keys) && 
values.isValueCompatibleWith(tprev.values);
+    }
+
+    @Override
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
-        // Note that this is only used if the collection is inside an UDT
-        if (!o1.hasRemaining() || !o2.hasRemaining())
+        return compareMaps(keys, values, o1, o2);
+    }
+
+    public static int compareMaps(AbstractType<?> keysComparator, 
AbstractType<?> valuesComparator, ByteBuffer o1, ByteBuffer o2)
+    {
+         if (!o1.hasRemaining() || !o2.hasRemaining())
             return o1.hasRemaining() ? 1 : o2.hasRemaining() ? -1 : 0;
 
         ByteBuffer bb1 = o1.duplicate();
         ByteBuffer bb2 = o2.duplicate();
 
-        int size1 = CollectionSerializer.readCollectionSize(bb1, 3);
-        int size2 = CollectionSerializer.readCollectionSize(bb2, 3);
+        int protocolVersion = Server.VERSION_3;
+        int size1 = CollectionSerializer.readCollectionSize(bb1, 
protocolVersion);
+        int size2 = CollectionSerializer.readCollectionSize(bb2, 
protocolVersion);
 
         for (int i = 0; i < Math.min(size1, size2); i++)
         {
-            ByteBuffer k1 = CollectionSerializer.readValue(bb1, 3);
-            ByteBuffer k2 = CollectionSerializer.readValue(bb2, 3);
-            int cmp = keys.compare(k1, k2);
+            ByteBuffer k1 = CollectionSerializer.readValue(bb1, 
protocolVersion);
+            ByteBuffer k2 = CollectionSerializer.readValue(bb2, 
protocolVersion);
+            int cmp = keysComparator.compare(k1, k2);
             if (cmp != 0)
                 return cmp;
 
-            ByteBuffer v1 = CollectionSerializer.readValue(bb1, 3);
-            ByteBuffer v2 = CollectionSerializer.readValue(bb2, 3);
-            cmp = values.compare(v1, v2);
+            ByteBuffer v1 = CollectionSerializer.readValue(bb1, 
protocolVersion);
+            ByteBuffer v2 = CollectionSerializer.readValue(bb2, 
protocolVersion);
+            cmp = valuesComparator.compare(v1, v2);
             if (cmp != 0)
                 return cmp;
         }
@@ -119,13 +168,23 @@ public class MapType<K, V> extends CollectionType<Map<K, 
V>>
         return keys.isByteOrderComparable();
     }
 
-    protected void appendToStringBuilder(StringBuilder sb)
+    @Override
+    public String toString(boolean ignoreFreezing)
     {
-        
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys,
 values)));
+        boolean includeFrozenType = !ignoreFreezing && !isMultiCell();
+
+        StringBuilder sb = new StringBuilder();
+        if (includeFrozenType)
+            sb.append(FrozenType.class.getName()).append("(");
+        
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys,
 values), ignoreFreezing || !isMultiCell));
+        if (includeFrozenType)
+            sb.append(")");
+        return sb.toString();
     }
 
     public List<ByteBuffer> serializedValues(List<Cell> cells)
     {
+        assert isMultiCell;
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size() * 2);
         for (Cell c : cells)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java 
b/src/java/org/apache/cassandra/db/marshal/SetType.java
index d2f7f12..e10f2a1 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -23,16 +23,17 @@ import java.util.*;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.SetSerializer;
 
 public class SetType<T> extends CollectionType<Set<T>>
 {
     // interning instances
-    private static final Map<AbstractType<?>, SetType> instances = new 
HashMap<AbstractType<?>, SetType>();
+    private static final Map<AbstractType<?>, SetType> instances = new 
HashMap<>();
+    private static final Map<AbstractType<?>, SetType> frozenInstances = new 
HashMap<>();
 
-    public final AbstractType<T> elements;
+    private final AbstractType<T> elements;
     private final SetSerializer<T> serializer;
+    private final boolean isMultiCell;
 
     public static SetType<?> getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
     {
@@ -40,25 +41,32 @@ public class SetType<T> extends CollectionType<Set<T>>
         if (l.size() != 1)
             throw new ConfigurationException("SetType takes exactly 1 type 
parameter");
 
-        return getInstance(l.get(0));
+        return getInstance(l.get(0), true);
     }
 
-    public static synchronized <T> SetType<T> getInstance(AbstractType<T> 
elements)
+    public static synchronized <T> SetType<T> getInstance(AbstractType<T> 
elements, boolean isMultiCell)
     {
-        SetType<T> t = instances.get(elements);
+        Map<AbstractType<?>, SetType> internMap = isMultiCell ? instances : 
frozenInstances;
+        SetType<T> t = internMap.get(elements);
         if (t == null)
         {
-            t = new SetType<T>(elements);
-            instances.put(elements, t);
+            t = new SetType<T>(elements, isMultiCell);
+            internMap.put(elements, t);
         }
         return t;
     }
 
-    public SetType(AbstractType<T> elements)
+    public SetType(AbstractType<T> elements, boolean isMultiCell)
     {
         super(Kind.SET);
         this.elements = elements;
         this.serializer = SetSerializer.getInstance(elements.getSerializer());
+        this.isMultiCell = isMultiCell;
+    }
+
+    public AbstractType<T> getElementsType()
+    {
+        return elements;
     }
 
     public AbstractType<T> nameComparator()
@@ -72,6 +80,35 @@ public class SetType<T> extends CollectionType<Set<T>>
     }
 
     @Override
+    public boolean isMultiCell()
+    {
+        return isMultiCell;
+    }
+
+    @Override
+    public AbstractType<?> freeze()
+    {
+        if (isMultiCell)
+            return getInstance(this.elements, false);
+        else
+            return this;
+    }
+
+    @Override
+    public boolean isCompatibleWithFrozen(CollectionType<?> previous)
+    {
+        assert !isMultiCell;
+        return this.elements.isCompatibleWith(((SetType) previous).elements);
+    }
+
+    @Override
+    public boolean isValueCompatibleWithFrozen(CollectionType<?> previous)
+    {
+        // because sets are ordered, any changes to the type must maintain the 
ordering
+        return isCompatibleWithFrozen(previous);
+    }
+
+    @Override
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
         return ListType.compareListOrSet(elements, o1, o2);
@@ -87,9 +124,19 @@ public class SetType<T> extends CollectionType<Set<T>>
         return elements.isByteOrderComparable();
     }
 
-    protected void appendToStringBuilder(StringBuilder sb)
+    @Override
+    public String toString(boolean ignoreFreezing)
     {
-        
sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
+        boolean includeFrozenType = !ignoreFreezing && !isMultiCell();
+
+        StringBuilder sb = new StringBuilder();
+        if (includeFrozenType)
+            sb.append(FrozenType.class.getName()).append("(");
+        sb.append(getClass().getName());
+        
sb.append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements),
 ignoreFreezing || !isMultiCell));
+        if (includeFrozenType)
+            sb.append(")");
+        return sb.toString();
     }
 
     public List<ByteBuffer> serializedValues(List<Cell> cells)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee55f361/src/java/org/apache/cassandra/db/marshal/TupleType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TupleType.java 
b/src/java/org/apache/cassandra/db/marshal/TupleType.java
index 42aaba1..ddaf53f 100644
--- a/src/java/org/apache/cassandra/db/marshal/TupleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TupleType.java
@@ -41,12 +41,17 @@ public class TupleType extends AbstractType<ByteBuffer>
 
     public TupleType(List<AbstractType<?>> types)
     {
+        for (int i = 0; i < types.size(); i++)
+            types.set(i, types.get(i).freeze());
         this.types = types;
     }
 
     public static TupleType getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
     {
-        return new TupleType(parser.getTypeParameters());
+        List<AbstractType<?>> types = parser.getTypeParameters();
+        for (int i = 0; i < types.size(); i++)
+            types.set(i, types.get(i).freeze());
+        return new TupleType(types);
     }
 
     public AbstractType<?> type(int i)
@@ -293,6 +298,6 @@ public class TupleType extends AbstractType<ByteBuffer>
     @Override
     public String toString()
     {
-        return getClass().getName() + 
TypeParser.stringifyTypeParameters(types);
+        return getClass().getName() + 
TypeParser.stringifyTypeParameters(types, true);
     }
 }

Reply via email to