http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java 
b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index 9eb5d82..9c05a77 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -30,6 +30,8 @@ import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.collect.Iterables.any;
+
 /**
  * A class avoiding class duplication between CompositeType and
  * DynamicCompositeType.
@@ -298,9 +300,9 @@ public abstract class AbstractCompositeType extends 
AbstractType<ByteBuffer>
     }
 
     @Override
-    public boolean referencesUserType(String name)
+    public boolean referencesUserType(ByteBuffer name)
     {
-        return getComponents().stream().anyMatch(f -> 
f.referencesUserType(name));
+        return any(getComponents(), t -> t.referencesUserType(name));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java 
b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 99df8a2..a09d147 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -447,11 +447,19 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>, Assignm
             ByteBufferUtil.skipWithVIntLength(in);
     }
 
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
     {
         return false;
     }
 
+    /**
+     * Returns an instance of this type with all referenced user types 
replaced with the new instance.
+     */
+    public AbstractType<?> withUpdatedUserType(UserType udt)
+    {
+        return this;
+    }
+
     public boolean referencesDuration()
     {
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java 
b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 12e7fc3..8871eff 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.collect.Iterables.transform;
+
 /*
  * The encoding of a CompositeType column name should be:
  *   <component><component><component> ...
@@ -102,18 +104,7 @@ public class CompositeType extends AbstractCompositeType
     public static CompositeType getInstance(List<AbstractType<?>> types)
     {
         assert types != null && !types.isEmpty();
-
-        CompositeType ct = instances.get(types);
-        if (ct == null)
-        {
-            ct = new CompositeType(types);
-            CompositeType previous = instances.putIfAbsent(types, ct);
-            if (previous != null)
-            {
-                ct = previous;
-            }
-        }
-        return ct;
+        return instances.computeIfAbsent(types, CompositeType::new);
     }
 
     protected CompositeType(List<AbstractType<?>> types)
@@ -287,6 +278,17 @@ public class CompositeType extends AbstractCompositeType
         return true;
     }
 
+    @Override
+    public CompositeType withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        instances.remove(types);
+
+        return getInstance(transform(types, t -> t.withUpdatedUserType(udt)));
+    }
+
     private static class StaticParsedComparator implements ParsedComparator
     {
         final AbstractType<?> type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java 
b/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
index cade725..5b02a05 100644
--- a/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
@@ -17,21 +17,20 @@
  */
 package org.apache.cassandra.db.marshal;
 
-import java.nio.charset.CharacterCodingException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.nio.charset.CharacterCodingException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-import org.apache.cassandra.cql3.Term;
+import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -61,9 +60,9 @@ public class DynamicCompositeType extends 
AbstractCompositeType
     private final Map<Byte, AbstractType<?>> aliases;
 
     // interning instances
-    private static final ConcurrentMap<Map<Byte, AbstractType<?>>, 
DynamicCompositeType> instances = new ConcurrentHashMap<Map<Byte, 
AbstractType<?>>, DynamicCompositeType>();
+    private static final ConcurrentHashMap<Map<Byte, AbstractType<?>>, 
DynamicCompositeType> instances = new ConcurrentHashMap<Map<Byte, 
AbstractType<?>>, DynamicCompositeType>();
 
-    public static synchronized DynamicCompositeType getInstance(TypeParser 
parser) throws ConfigurationException, SyntaxException
+    public static DynamicCompositeType getInstance(TypeParser parser)
     {
         return getInstance(parser.getAliasParameters());
     }
@@ -71,9 +70,9 @@ public class DynamicCompositeType extends 
AbstractCompositeType
     public static DynamicCompositeType getInstance(Map<Byte, AbstractType<?>> 
aliases)
     {
         DynamicCompositeType dct = instances.get(aliases);
-        if (dct == null)
-            dct = instances.computeIfAbsent(aliases, k ->  new 
DynamicCompositeType(k));
-        return dct;
+        return null == dct
+             ? instances.computeIfAbsent(aliases, DynamicCompositeType::new)
+             : dct;
     }
 
     private DynamicCompositeType(Map<Byte, AbstractType<?>> aliases)
@@ -257,6 +256,17 @@ public class DynamicCompositeType extends 
AbstractCompositeType
         return true;
     }
 
+    @Override
+    public DynamicCompositeType withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        instances.remove(aliases);
+
+        return getInstance(Maps.transformValues(aliases, v -> 
v.withUpdatedUserType(udt)));
+    }
+
     private class DynamicParsedComparator implements ParsedComparator
     {
         final AbstractType<?> type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java 
b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 29ccaa5..cb21f11 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Lists;
@@ -29,20 +28,15 @@ import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.ListSerializer;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class ListType<T> extends CollectionType<List<T>>
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(ListType.class);
-
     // interning instances
-    private static final ConcurrentMap<AbstractType<?>, ListType> instances = 
new ConcurrentHashMap<>();
-    private static final ConcurrentMap<AbstractType<?>, ListType> 
frozenInstances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<AbstractType<?>, ListType> 
instances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<AbstractType<?>, ListType> 
frozenInstances = new ConcurrentHashMap<>();
 
     private final AbstractType<T> elements;
     public final ListSerializer<T> serializer;
@@ -57,13 +51,13 @@ public class ListType<T> extends CollectionType<List<T>>
         return getInstance(l.get(0), true);
     }
 
-    public static <T> ListType<T> getInstance(AbstractType<T> elements, final 
boolean isMultiCell)
+    public static <T> ListType<T> getInstance(AbstractType<T> elements, 
boolean isMultiCell)
     {
-        ConcurrentMap<AbstractType<?>, ListType> internMap = isMultiCell ? 
instances : frozenInstances;
+        ConcurrentHashMap<AbstractType<?>, ListType> internMap = isMultiCell ? 
instances : frozenInstances;
         ListType<T> t = internMap.get(elements);
-        if (t == null)
-            t = internMap.computeIfAbsent(elements, k -> new ListType<>(k, 
isMultiCell) );
-        return t;
+        return null == t
+             ? internMap.computeIfAbsent(elements, k -> new ListType<>(k, 
isMultiCell))
+             : t;
     }
 
     private ListType(AbstractType<T> elements, boolean isMultiCell)
@@ -75,9 +69,20 @@ public class ListType<T> extends CollectionType<List<T>>
     }
 
     @Override
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
     {
-        return getElementsType().referencesUserType(userTypeName);
+        return elements.referencesUserType(name);
+    }
+
+    @Override
+    public ListType<?> withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        (isMultiCell ? instances : frozenInstances).remove(elements);
+
+        return getInstance(elements.withUpdatedUserType(udt), isMultiCell);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java 
b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 1bfc044..8b82134 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Maps;
@@ -37,8 +36,8 @@ import org.apache.cassandra.utils.Pair;
 public class MapType<K, V> extends CollectionType<Map<K, V>>
 {
     // interning instances
-    private static final ConcurrentMap<Pair<AbstractType<?>, AbstractType<?>>, 
MapType> instances = new ConcurrentHashMap<>();
-    private static final ConcurrentMap<Pair<AbstractType<?>, AbstractType<?>>, 
MapType> frozenInstances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<Pair<AbstractType<?>, 
AbstractType<?>>, MapType> instances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<Pair<AbstractType<?>, 
AbstractType<?>>, MapType> frozenInstances = new ConcurrentHashMap<>();
 
     private final AbstractType<K> keys;
     private final AbstractType<V> values;
@@ -56,12 +55,12 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
 
     public static <K, V> MapType<K, V> getInstance(AbstractType<K> keys, 
AbstractType<V> values, boolean isMultiCell)
     {
-        ConcurrentMap<Pair<AbstractType<?>, AbstractType<?>>, MapType> 
internMap = isMultiCell ? instances : frozenInstances;
-        Pair<AbstractType<?>, AbstractType<?>> p = Pair.<AbstractType<?>, 
AbstractType<?>>create(keys, values);
+        ConcurrentHashMap<Pair<AbstractType<?>, AbstractType<?>>, MapType> 
internMap = isMultiCell ? instances : frozenInstances;
+        Pair<AbstractType<?>, AbstractType<?>> p = Pair.create(keys, values);
         MapType<K, V> t = internMap.get(p);
-        if (t == null)
-            t = internMap.computeIfAbsent(p, k -> new MapType<>(k.left, 
k.right, isMultiCell) );
-        return t;
+        return null == t
+             ? internMap.computeIfAbsent(p, k -> new MapType<>(k.left, 
k.right, isMultiCell))
+             : t;
     }
 
     private MapType(AbstractType<K> keys, AbstractType<V> values, boolean 
isMultiCell)
@@ -74,10 +73,20 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
     {
-        return getKeysType().referencesUserType(userTypeName) ||
-               getValuesType().referencesUserType(userTypeName);
+        return keys.referencesUserType(name) || 
values.referencesUserType(name);
+    }
+
+    @Override
+    public MapType<?,?> withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        (isMultiCell ? instances : frozenInstances).remove(Pair.create(keys, 
values));
+
+        return getInstance(keys.withUpdatedUserType(udt), 
values.withUpdatedUserType(udt), isMultiCell);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/ReversedType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ReversedType.java 
b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
index 250dfdc..81f7522 100644
--- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
@@ -18,14 +18,13 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -33,11 +32,11 @@ import org.apache.cassandra.transport.ProtocolVersion;
 public class ReversedType<T> extends AbstractType<T>
 {
     // interning instances
-    private static final Map<AbstractType<?>, ReversedType> instances = new 
HashMap<AbstractType<?>, ReversedType>();
+    private static final Map<AbstractType<?>, ReversedType> instances = new 
ConcurrentHashMap<>();
 
     public final AbstractType<T> baseType;
 
-    public static <T> ReversedType<T> getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
+    public static <T> ReversedType<T> getInstance(TypeParser parser)
     {
         List<AbstractType<?>> types = parser.getTypeParameters();
         if (types.size() != 1)
@@ -45,15 +44,9 @@ public class ReversedType<T> extends AbstractType<T>
         return getInstance((AbstractType<T>) types.get(0));
     }
 
-    public static synchronized <T> ReversedType<T> getInstance(AbstractType<T> 
baseType)
+    public static <T> ReversedType<T> getInstance(AbstractType<T> baseType)
     {
-        ReversedType<T> type = instances.get(baseType);
-        if (type == null)
-        {
-            type = new ReversedType<T>(baseType);
-            instances.put(baseType, type);
-        }
-        return type;
+        return instances.computeIfAbsent(baseType, ReversedType::new);
     }
 
     private ReversedType(AbstractType<T> baseType)
@@ -126,9 +119,21 @@ public class ReversedType<T> extends AbstractType<T>
         return baseType.getSerializer();
     }
 
-    public boolean referencesUserType(String userTypeName)
+    @Override
+    public boolean referencesUserType(ByteBuffer name)
     {
-        return baseType.referencesUserType(userTypeName);
+        return baseType.referencesUserType(name);
+    }
+
+    @Override
+    public ReversedType<?> withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        instances.remove(baseType);
+
+        return getInstance(baseType.withUpdatedUserType(udt));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java 
b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 4374612..38d5ff7 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Sets;
@@ -35,8 +34,8 @@ import org.apache.cassandra.transport.ProtocolVersion;
 public class SetType<T> extends CollectionType<Set<T>>
 {
     // interning instances
-    private static final ConcurrentMap<AbstractType<?>, SetType> instances = 
new ConcurrentHashMap<>();
-    private static final ConcurrentMap<AbstractType<?>, SetType> 
frozenInstances = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<AbstractType<?>, SetType> instances 
= new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<AbstractType<?>, SetType> 
frozenInstances = new ConcurrentHashMap<>();
 
     private final AbstractType<T> elements;
     private final SetSerializer<T> serializer;
@@ -53,11 +52,11 @@ public class SetType<T> extends CollectionType<Set<T>>
 
     public static <T> SetType<T> getInstance(AbstractType<T> elements, boolean 
isMultiCell)
     {
-        ConcurrentMap<AbstractType<?>, SetType> internMap = isMultiCell ? 
instances : frozenInstances;
+        ConcurrentHashMap<AbstractType<?>, SetType> internMap = isMultiCell ? 
instances : frozenInstances;
         SetType<T> t = internMap.get(elements);
-        if (t == null)
-            t = internMap.computeIfAbsent(elements, k -> new SetType<>(k, 
isMultiCell) );
-        return t;
+        return null == t
+             ? internMap.computeIfAbsent(elements, k -> new SetType<>(k, 
isMultiCell))
+             : t;
     }
 
     public SetType(AbstractType<T> elements, boolean isMultiCell)
@@ -69,9 +68,20 @@ public class SetType<T> extends CollectionType<Set<T>>
     }
 
     @Override
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
     {
-        return getElementsType().referencesUserType(userTypeName);
+        return elements.referencesUserType(name);
+    }
+
+    @Override
+    public SetType<?> withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        (isMultiCell ? instances : frozenInstances).remove(elements);
+
+        return getInstance(elements.withUpdatedUserType(udt), isMultiCell);
     }
 
     public AbstractType<T> getElementsType()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/TupleType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TupleType.java 
b/src/java/org/apache/cassandra/db/marshal/TupleType.java
index 71e946c..7f90c0a 100644
--- a/src/java/org/apache/cassandra/db/marshal/TupleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TupleType.java
@@ -23,9 +23,9 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -35,6 +35,8 @@ import org.apache.cassandra.serializers.*;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.collect.Iterables.any;
+
 /**
  * This is essentially like a CompositeType, but it's not primarily meant for 
comparison, just
  * to pack multiple values together so has a more friendly encoding.
@@ -60,8 +62,9 @@ public class TupleType extends AbstractType<ByteBuffer>
     protected TupleType(List<AbstractType<?>> types, boolean freezeInner)
     {
         super(ComparisonType.CUSTOM);
+
         if (freezeInner)
-            this.types = 
types.stream().map(AbstractType::freeze).collect(Collectors.toList());
+            this.types = Lists.transform(types, AbstractType::freeze);
         else
             this.types = types;
     }
@@ -75,9 +78,17 @@ public class TupleType extends AbstractType<ByteBuffer>
     }
 
     @Override
-    public boolean referencesUserType(String name)
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return any(types, t -> t.referencesUserType(name));
+    }
+
+    @Override
+    public TupleType withUpdatedUserType(UserType udt)
     {
-        return allTypes().stream().anyMatch(f -> f.referencesUserType(name));
+        return referencesUserType(udt.name)
+             ? new TupleType(Lists.transform(types, t -> 
t.withUpdatedUserType(udt)))
+             : this;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java 
b/src/java/org/apache/cassandra/db/marshal/UserType.java
index f139edd..f9786d6 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -22,18 +22,18 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.schema.Diff;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Iterables.any;
 
 /**
  * A user defined type.
@@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
  */
 public class UserType extends TupleType
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(UserType.class);
-
     public final String keyspace;
     public final ByteBuffer name;
     private final List<FieldIdentifier> fieldNames;
@@ -64,7 +62,7 @@ public class UserType extends TupleType
             stringFieldNames.add(fieldName.toString());
     }
 
-    public static UserType getInstance(TypeParser parser) throws 
ConfigurationException, SyntaxException
+    public static UserType getInstance(TypeParser parser)
     {
         Pair<Pair<String, ByteBuffer>, List<Pair<ByteBuffer, AbstractType>>> 
params = parser.getUserTypeParameters();
         String keyspace = params.left.left;
@@ -386,6 +384,11 @@ public class UserType extends TupleType
         return true;
     }
 
+    public boolean equals(UserType other, Diff.Mode mode)
+    {
+        return equals(other);
+    }
+
     @Override
     public CQL3Type asCQL3Type()
     {
@@ -393,10 +396,25 @@ public class UserType extends TupleType
     }
 
     @Override
-    public boolean referencesUserType(String userTypeName)
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return this.name.equals(name) || any(fieldTypes(), t -> 
t.referencesUserType(name));
+    }
+
+    @Override
+    public UserType withUpdatedUserType(UserType udt)
     {
-        return getNameAsString().equals(userTypeName) ||
-               fieldTypes().stream().anyMatch(f -> 
f.referencesUserType(userTypeName));
+        if (!referencesUserType(udt.name))
+            return this;
+
+        if (name.equals(udt.name))
+            return udt;
+
+        return new UserType(keyspace,
+                            name,
+                            fieldNames,
+                            Lists.transform(fieldTypes(), t -> 
t.withUpdatedUserType(udt)),
+                            isMultiCell());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java 
b/src/java/org/apache/cassandra/db/view/TableViews.java
index 0579429..29ef77e 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -83,7 +83,7 @@ public class TableViews extends AbstractCollection<View>
     public Iterable<ColumnFamilyStore> allViewsCfs()
     {
         Keyspace keyspace = Keyspace.open(baseTableMetadata.keyspace);
-        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().name));
+        return Iterables.transform(views, view -> 
keyspace.getColumnFamilyStore(view.getDefinition().name()));
     }
 
     public void forceBlockingFlush()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java 
b/src/java/org/apache/cassandra/db/view/View.java
index 4bffe16..49074ce 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -25,7 +25,7 @@ import javax.annotation.Nullable;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.selection.RawSelector;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -35,7 +35,6 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.ViewMetadata;
-import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,20 +58,14 @@ public class View
     private final boolean includeAllColumns;
     private ViewBuilder builder;
 
-    // Only the raw statement can be final, because the statement cannot 
always be prepared when the MV is initialized.
-    // For example, during startup, this view will be initialized as part of 
the Keyspace.open() work; preparing a statement
-    // also requires the keyspace to be open, so this results in 
double-initialization problems.
-    private final SelectStatement.RawStatement rawSelect;
     private SelectStatement select;
     private ReadQuery query;
 
-    public View(ViewMetadata definition,
-                ColumnFamilyStore baseCfs)
+    public View(ViewMetadata definition, ColumnFamilyStore baseCfs)
     {
         this.baseCfs = baseCfs;
-        this.name = definition.name;
+        this.name = definition.name();
         this.includeAllColumns = definition.includeAllColumns;
-        this.rawSelect = definition.select;
 
         updateDefinition(definition);
     }
@@ -179,31 +172,52 @@ public class View
      * Returns the SelectStatement used to populate and filter this view.  
Internal users should access the select
      * statement this way to ensure it has been prepared.
      */
-    public SelectStatement getSelectStatement()
+    SelectStatement getSelectStatement()
     {
-        if (select == null)
+        if (null == select)
         {
-            ClientState state = ClientState.forInternalCalls();
-            state.setKeyspace(baseCfs.keyspace.getName());
-            rawSelect.prepareKeyspace(state);
-            ParsedStatement.Prepared prepared = rawSelect.prepare(true);
-            select = (SelectStatement) prepared.statement;
+            SelectStatement.Parameters parameters =
+                new SelectStatement.Parameters(Collections.emptyMap(),
+                                               Collections.emptyList(),
+                                               false,
+                                               true,
+                                               false);
+
+            SelectStatement.RawStatement rawSelect =
+                new SelectStatement.RawStatement(new 
QualifiedName(baseCfs.keyspace.getName(), baseCfs.name),
+                                                 parameters,
+                                                 selectClause(),
+                                                 definition.whereClause,
+                                                 null,
+                                                 null);
+
+            rawSelect.setBindVariables(Collections.emptyList());
+
+            select = rawSelect.prepare(true);
         }
 
         return select;
     }
 
+    private List<RawSelector> selectClause()
+    {
+        return definition.metadata
+                         .columns()
+                         .stream()
+                         .map(c -> c.name.toString())
+                         .map(ColumnMetadata.Raw::forQuoted)
+                         .map(c -> new RawSelector(c, null))
+                         .collect(Collectors.toList());
+    }
+
     /**
      * Returns the ReadQuery used to filter this view.  Internal users should 
access the query this way to ensure it
      * has been prepared.
      */
-    public ReadQuery getReadQuery()
+    ReadQuery getReadQuery()
     {
         if (query == null)
-        {
             query = 
getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()),
 FBUtilities.nowInSeconds());
-            logger.trace("View query: {}", rawSelect);
-        }
 
         return query;
     }
@@ -228,60 +242,10 @@ public class View
         return (view == null) ? null : 
Schema.instance.getTableMetadataRef(view.baseTableId);
     }
 
+    // TODO: REMOVE
     public static Iterable<ViewMetadata> findAll(String keyspace, String 
baseTable)
     {
         KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace);
         return Iterables.filter(ksm.views, view -> 
view.baseTableName.equals(baseTable));
     }
-
-    /**
-     * Builds the string text for a materialized view's SELECT statement.
-     */
-    public static String buildSelectStatement(String cfName, 
Collection<ColumnMetadata> includedColumns, String whereClause)
-    {
-         StringBuilder rawSelect = new StringBuilder("SELECT ");
-        if (includedColumns == null || includedColumns.isEmpty())
-            rawSelect.append("*");
-        else
-            rawSelect.append(includedColumns.stream().map(id -> 
id.name.toCQLString()).collect(Collectors.joining(", ")));
-        rawSelect.append(" FROM \"").append(cfName).append("\" WHERE ") 
.append(whereClause).append(" ALLOW FILTERING");
-        return rawSelect.toString();
-    }
-
-    public static String relationsToWhereClause(List<Relation> whereClause)
-    {
-        List<String> expressions = new ArrayList<>(whereClause.size());
-        for (Relation rel : whereClause)
-        {
-            StringBuilder sb = new StringBuilder();
-
-            if (rel.isMultiColumn())
-            {
-                sb.append(((MultiColumnRelation) rel).getEntities().stream()
-                        .map(ColumnMetadata.Raw::toString)
-                        .collect(Collectors.joining(", ", "(", ")")));
-            }
-            else
-            {
-                sb.append(((SingleColumnRelation) rel).getEntity());
-            }
-
-            sb.append(" ").append(rel.operator()).append(" ");
-
-            if (rel.isIN())
-            {
-                sb.append(rel.getInValues().stream()
-                        .map(Term.Raw::getText)
-                        .collect(Collectors.joining(", ", "(", ")")));
-            }
-            else
-            {
-                sb.append(rel.getValue().getText());
-            }
-
-            expressions.add(sb.toString());
-        }
-
-        return expressions.stream().collect(Collectors.joining(" AND "));
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java 
b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 84738b1..2e39684 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -98,7 +98,7 @@ public class ViewManager
         Map<String, ViewMetadata> newViewsByName = new HashMap<>();
         for (ViewMetadata definition : keyspace.getMetadata().views)
         {
-            newViewsByName.put(definition.name, definition);
+            newViewsByName.put(definition.name(), definition);
         }
 
         for (String viewName : viewsByName.keySet())
@@ -139,7 +139,7 @@ public class ViewManager
     {
         View view = new View(definition, 
keyspace.getColumnFamilyStore(definition.baseTableId));
         forTable(view.getDefinition().baseTableId).add(view);
-        viewsByName.put(definition.name, view);
+        viewsByName.put(definition.name(), view);
     }
 
     public void removeView(String name)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 0f44e0c..d739534 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -29,8 +29,8 @@ import java.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index d026a03..58ec48c 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -31,6 +31,8 @@ import java.util.stream.Collectors;
 
 import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.TypeCodec;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -39,10 +41,7 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UpdateParameters;
 import org.apache.cassandra.cql3.functions.UDHelper;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.cql3.statements.CreateTypeStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
-import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DecoratedKey;
@@ -59,7 +58,6 @@ import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Utility to write SSTables.
@@ -342,8 +340,8 @@ public class CQLSSTableWriter implements Closeable
 
         protected SSTableFormat.Type formatType = null;
 
-        private CreateTableStatement.RawStatement schemaStatement;
-        private final List<CreateTypeStatement> typeStatements;
+        private CreateTableStatement.Raw schemaStatement;
+        private final List<CreateTypeStatement.Raw> typeStatements;
         private ModificationStatement.Parsed insertStatement;
         private IPartitioner partitioner;
 
@@ -392,7 +390,7 @@ public class CQLSSTableWriter implements Closeable
 
         public Builder withType(String typeDefinition) throws SyntaxException
         {
-            typeStatements.add(QueryProcessor.parseStatement(typeDefinition, 
CreateTypeStatement.class, "CREATE TYPE"));
+            typeStatements.add(QueryProcessor.parseStatement(typeDefinition, 
CreateTypeStatement.Raw.class, "CREATE TYPE"));
             return this;
         }
 
@@ -412,7 +410,7 @@ public class CQLSSTableWriter implements Closeable
          */
         public Builder forTable(String schema)
         {
-            this.schemaStatement = QueryProcessor.parseStatement(schema, 
CreateTableStatement.RawStatement.class, "CREATE TABLE");
+            this.schemaStatement = QueryProcessor.parseStatement(schema, 
CreateTableStatement.Raw.class, "CREATE TABLE");
             return this;
         }
 
@@ -515,16 +513,16 @@ public class CQLSSTableWriter implements Closeable
 
                 createTypes(keyspace);
                 TableMetadataRef tableMetadata = 
TableMetadataRef.forOfflineTools(createTable(keyspace));
-                Pair<UpdateStatement, List<ColumnSpecification>> 
preparedInsert = prepareInsert();
+                UpdateStatement preparedInsert = prepareInsert();
 
                 AbstractSSTableSimpleWriter writer = sorted
-                                                     ? new 
SSTableSimpleWriter(directory, tableMetadata, 
preparedInsert.left.updatedColumns())
-                                                     : new 
SSTableSimpleUnsortedWriter(directory, tableMetadata, 
preparedInsert.left.updatedColumns(), bufferSizeInMB);
+                                                     ? new 
SSTableSimpleWriter(directory, tableMetadata, preparedInsert.updatedColumns())
+                                                     : new 
SSTableSimpleUnsortedWriter(directory, tableMetadata, 
preparedInsert.updatedColumns(), bufferSizeInMB);
 
                 if (formatType != null)
                     writer.setSSTableFormatType(formatType);
 
-                return new CQLSSTableWriter(writer, preparedInsert.left, 
preparedInsert.right);
+                return new CQLSSTableWriter(writer, preparedInsert, 
preparedInsert.getBindVariables());
             }
         }
 
@@ -532,7 +530,7 @@ public class CQLSSTableWriter implements Closeable
         {
             KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspace);
             Types.RawBuilder builder = Types.rawBuilder(keyspace);
-            for (CreateTypeStatement st : typeStatements)
+            for (CreateTypeStatement.Raw st : typeStatements)
                 st.addToRawBuilder(builder);
 
             ksm = ksm.withSwapped(builder.build());
@@ -547,15 +545,16 @@ public class CQLSSTableWriter implements Closeable
         {
             KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspace);
 
-            TableMetadata tableMetadata = 
ksm.tables.getNullable(schemaStatement.columnFamily());
+            TableMetadata tableMetadata = 
ksm.tables.getNullable(schemaStatement.table());
 
             if (tableMetadata != null)
                 return tableMetadata;
 
-            CreateTableStatement statement = (CreateTableStatement) 
schemaStatement.prepare(ksm.types).statement;
+            ClientState state = ClientState.forInternalCalls();
+            CreateTableStatement statement = schemaStatement.prepare(state);
             statement.validate(ClientState.forInternalCalls());
 
-            TableMetadata.Builder builder = statement.builder();
+            TableMetadata.Builder builder = statement.builder(ksm.types);
             if (partitioner != null)
                 builder.partitioner(partitioner);
             TableMetadata metadata = builder.build();
@@ -569,20 +568,20 @@ public class CQLSSTableWriter implements Closeable
          *
          * @return prepared Insert statement and it's bound names
          */
-        private Pair<UpdateStatement, List<ColumnSpecification>> 
prepareInsert()
+        private UpdateStatement prepareInsert()
         {
-            ParsedStatement.Prepared cqlStatement = insertStatement.prepare();
-            UpdateStatement insert = (UpdateStatement) cqlStatement.statement;
-            insert.validate(ClientState.forInternalCalls());
+            ClientState state = ClientState.forInternalCalls();
+            UpdateStatement insert = (UpdateStatement) 
insertStatement.prepare(state);
+            insert.validate(state);
 
             if (insert.hasConditions())
                 throw new IllegalArgumentException("Conditional statements are 
not supported");
             if (insert.isCounter())
                 throw new IllegalArgumentException("Counter update statements 
are not supported");
-            if (cqlStatement.boundNames.isEmpty())
+            if (insert.getBindVariables().isEmpty())
                 throw new IllegalArgumentException("Provided insert statement 
has no bind variables");
 
-            return Pair.create(insert, cqlStatement.boundNames);
+            return insert;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index a3b8f22..a79741d 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.repair.consistent.CoordinatorSession;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -576,7 +577,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
 
                 String format = "select event_id, source, activity from %s.%s 
where session_id = ? and event_id > ? and event_id < ?;";
                 String query = String.format(format, 
SchemaConstants.TRACE_KEYSPACE_NAME, TraceKeyspace.EVENTS);
-                SelectStatement statement = (SelectStatement) 
QueryProcessor.parseStatement(query).prepare().statement;
+                SelectStatement statement = (SelectStatement) 
QueryProcessor.parseStatement(query).prepare(ClientState.forInternalCalls());
 
                 ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
                 InetAddress source = FBUtilities.getBroadcastAddress();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 6ebd756..2a27a60 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/Diff.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Diff.java 
b/src/java/org/apache/cassandra/schema/Diff.java
new file mode 100644
index 0000000..7f77f3f
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Diff.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.Iterables;
+
+public class Diff<T extends Iterable, S>
+{
+    public enum Mode
+    {
+        IN_MEMORY, ON_DISK
+    }
+
+    public final T created;
+    public final T dropped;
+    public final ImmutableCollection<Altered<S>> altered;
+
+    Diff(T created, T dropped, ImmutableCollection<Altered<S>> altered)
+    {
+        this.created = created;
+        this.dropped = dropped;
+        this.altered = altered;
+    }
+
+    boolean isEmpty()
+    {
+        return Iterables.isEmpty(created) && Iterables.isEmpty(dropped) && 
Iterables.isEmpty(altered);
+    }
+
+    public static final class Altered<T>
+    {
+        public final T before;
+        public final T after;
+
+        Altered(T before, T after)
+        {
+            this.before = before;
+            this.after = after;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Functions.java 
b/src/java/org/apache/cassandra/schema/Functions.java
index 8e3a3f1..db5e871 100644
--- a/src/java/org/apache/cassandra/schema/Functions.java
+++ b/src/java/org/apache/cassandra/schema/Functions.java
@@ -17,19 +17,20 @@
  */
 package org.apache.cassandra.schema;
 
+import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.stream.Collectors;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
 
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.db.marshal.UserType;
 
-import static com.google.common.collect.Iterables.filter;
+import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.Iterables.any;
 
 /**
  * An immutable container for a keyspace's UDAs and UDFs (and, in case of 
{@link org.apache.cassandra.db.SystemKeyspace},
@@ -37,6 +38,21 @@ import static com.google.common.collect.Iterables.filter;
  */
 public final class Functions implements Iterable<Function>
 {
+    public enum Filter implements Predicate<Function>
+    {
+        ALL, UDF, UDA;
+
+        public boolean test(Function function)
+        {
+            switch (this)
+            {
+                case UDF: return function instanceof UDFunction;
+                case UDA: return function instanceof UDAggregate;
+                default:  return true;
+            }
+        }
+    }
+
     private final ImmutableMultimap<FunctionName, Function> functions;
 
     private Functions(Builder builder)
@@ -69,12 +85,17 @@ public final class Functions implements Iterable<Function>
         return functions.values().stream();
     }
 
+    public int size()
+    {
+        return functions.size();
+    }
+
     /**
      * @return a stream of keyspace's UDFs
      */
     public Stream<UDFunction> udfs()
     {
-        return stream().filter(f -> f instanceof UDFunction).map(f -> 
(UDFunction) f);
+        return stream().filter(Filter.UDF).map(f -> (UDFunction) f);
     }
 
     /**
@@ -82,29 +103,23 @@ public final class Functions implements Iterable<Function>
      */
     public Stream<UDAggregate> udas()
     {
-        return stream().filter(f -> f instanceof UDAggregate).map(f -> 
(UDAggregate) f);
+        return stream().filter(Filter.UDA).map(f -> (UDAggregate) f);
     }
 
-    MapDifference<Pair<FunctionName, List<String>>, UDFunction> 
udfsDiff(Functions other)
+    public Iterable<Function> referencingUserType(ByteBuffer name)
     {
-        Map<Pair<FunctionName, List<String>>, UDFunction> before = new 
HashMap<>();
-        udfs().forEach(f -> before.put(Pair.create(f.name(), 
f.argumentsList()), f));
-
-        Map<Pair<FunctionName, List<String>>, UDFunction> after = new 
HashMap<>();
-        other.udfs().forEach(f -> after.put(Pair.create(f.name(), 
f.argumentsList()), f));
-
-        return Maps.difference(before, after);
+        return Iterables.filter(this, f -> f.referencesUserType(name));
     }
 
-    MapDifference<Pair<FunctionName, List<String>>, UDAggregate> 
udasDiff(Functions other)
+    public Functions withUpdatedUserType(UserType udt)
     {
-        Map<Pair<FunctionName, List<String>>, UDAggregate> before = new 
HashMap<>();
-        udas().forEach(f -> before.put(Pair.create(f.name(), 
f.argumentsList()), f));
+        if (!any(this, f -> f.referencesUserType(udt.name)))
+            return this;
 
-        Map<Pair<FunctionName, List<String>>, UDAggregate> after = new 
HashMap<>();
-        other.udas().forEach(f -> after.put(Pair.create(f.name(), 
f.argumentsList()), f));
+        Collection<UDFunction>  udfs = udfs().map(f -> 
f.withUpdatedUserType(udt)).collect(toList());
+        Collection<UDAggregate> udas = udas().map(f -> 
f.withUpdatedUserType(udfs, udt)).collect(toList());
 
-        return Maps.difference(before, after);
+        return builder().add(udfs).add(udas).build();
     }
 
     /**
@@ -113,7 +128,7 @@ public final class Functions implements Iterable<Function>
      */
     public Collection<UDAggregate> aggregatesUsingFunction(Function function)
     {
-        return udas().filter(uda -> 
uda.hasReferenceTo(function)).collect(Collectors.toList());
+        return udas().filter(uda -> 
uda.hasReferenceTo(function)).collect(toList());
     }
 
     /**
@@ -127,6 +142,11 @@ public final class Functions implements Iterable<Function>
         return functions.get(name);
     }
 
+    public Optional<Function> find(FunctionName name, List<AbstractType<?>> 
argTypes)
+    {
+        return find(name, argTypes, Filter.ALL);
+    }
+
     /**
      * Find the function with the specified name
      *
@@ -134,13 +154,18 @@ public final class Functions implements Iterable<Function>
      * @param argTypes function argument types
      * @return an empty {@link Optional} if the function name is not found; a 
non-empty optional of {@link Function} otherwise
      */
-    public Optional<Function> find(FunctionName name, List<AbstractType<?>> 
argTypes)
+    public Optional<Function> find(FunctionName name, List<AbstractType<?>> 
argTypes, Filter filter)
     {
         return get(name).stream()
-                        .filter(fun -> typesMatch(fun.argTypes(), argTypes))
+                        .filter(filter.and(fun -> typesMatch(fun.argTypes(), 
argTypes)))
                         .findAny();
     }
 
+    public boolean isEmpty()
+    {
+        return functions.isEmpty();
+    }
+
     /*
      * We need to compare the CQL3 representation of the type because comparing
      * the AbstractType will fail for example if a UDT has been changed.
@@ -184,6 +209,13 @@ public final class Functions implements Iterable<Function>
         return h;
     }
 
+    public Functions filter(Predicate<Function> predicate)
+    {
+        Builder builder = builder();
+        stream().filter(predicate).forEach(builder::add);
+        return builder.build();
+    }
+
     /**
      * Create a Functions instance with the provided function added
      */
@@ -203,7 +235,19 @@ public final class Functions implements Iterable<Function>
         Function fun =
             find(name, argTypes).orElseThrow(() -> new 
IllegalStateException(String.format("Function %s doesn't exists", name)));
 
-        return builder().add(filter(this, f -> f != fun)).build();
+        return without(fun);
+    }
+
+    public Functions without(Function function)
+    {
+        return builder().add(Iterables.filter(this, f -> f != 
function)).build();
+    }
+
+    public Functions withAddedOrUpdated(Function function)
+    {
+        return builder().add(Iterables.filter(this, f -> 
!(f.name().equals(function.name())) && Functions.typesMatch(f.argTypes(), 
function.argTypes())))
+                        .add(function)
+                        .build();
     }
 
     @Override
@@ -252,10 +296,53 @@ public final class Functions implements Iterable<Function>
             return this;
         }
 
-        public  Builder add(Iterable<? extends Function> funs)
+        public Builder add(Iterable<? extends Function> funs)
         {
             funs.forEach(this::add);
             return this;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    static FunctionsDiff<UDFunction> udfsDiff(Functions before, Functions 
after, Diff.Mode mode)
+    {
+        return (FunctionsDiff<UDFunction>) FunctionsDiff.diff(before, after, 
Filter.UDF, mode);
+    }
+
+    @SuppressWarnings("unchecked")
+    static FunctionsDiff<UDAggregate> udasDiff(Functions before, Functions 
after, Diff.Mode mode)
+    {
+        return (FunctionsDiff<UDAggregate>) FunctionsDiff.diff(before, after, 
Filter.UDA, mode);
+    }
+
+    public static final class FunctionsDiff<T extends Function> extends 
Diff<Functions, T>
+    {
+        static final FunctionsDiff NONE = new 
FunctionsDiff<>(Functions.none(), Functions.none(), ImmutableList.of());
+
+        private FunctionsDiff(Functions created, Functions dropped, 
ImmutableCollection<Altered<T>> altered)
+        {
+            super(created, dropped, altered);
+        }
+
+        private static FunctionsDiff diff(Functions before, Functions after, 
Filter filter, Mode mode)
+        {
+            if (before == after)
+                return NONE;
+
+            Functions created = after.filter(filter.and(k -> 
!before.find(k.name(), k.argTypes(), filter).isPresent()));
+            Functions dropped = before.filter(filter.and(k -> 
!after.find(k.name(), k.argTypes(), filter).isPresent()));
+
+            ImmutableList.Builder<Altered<Function>> altered = 
ImmutableList.builder();
+            before.stream().filter(filter).forEach(functionBefore ->
+            {
+                after.find(functionBefore.name(), functionBefore.argTypes(), 
filter).ifPresent(functionAfter ->
+                {
+                    if (!functionBefore.equals(functionAfter, mode))
+                        altered.add(new Altered<>(functionBefore, 
functionAfter));
+                });
+            });
+
+            return new FunctionsDiff<>(created, dropped, altered.build());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java 
b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 190871a..bf5b701 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -98,12 +98,14 @@ public final class IndexMetadata
         return name != null && !name.isEmpty() && 
PATTERN_WORD_CHARS.matcher(name).matches();
     }
 
-    public static String getDefaultIndexName(String cfName, String root)
+    public static String generateDefaultIndexName(String table, 
ColumnIdentifier column)
     {
-        if (root == null)
-            return PATTERN_NON_WORD_CHAR.matcher(cfName + "_" + 
"idx").replaceAll("");
-        else
-            return PATTERN_NON_WORD_CHAR.matcher(cfName + "_" + root + 
"_idx").replaceAll("");
+        return PATTERN_NON_WORD_CHAR.matcher(table + "_" + column.toString() + 
"_idx").replaceAll("");
+    }
+
+    public static String generateDefaultIndexName(String table)
+    {
+        return PATTERN_NON_WORD_CHAR.matcher(table + "_" + 
"idx").replaceAll("");
     }
 
     public void validate(TableMetadata table)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java 
b/src/java/org/apache/cassandra/schema/Indexes.java
index 81d400e..2e95779 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -22,8 +22,6 @@ import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.cassandra.exceptions.ConfigurationException;
-
 import static java.lang.String.format;
 
 import static com.google.common.collect.Iterables.filter;
@@ -168,20 +166,6 @@ public final class Indexes implements 
Iterable<IndexMetadata>
 
     public void validate(TableMetadata table)
     {
-        /*
-         * Index name check is duplicated in Keyspaces, for the time being.
-         * The reason for this is that schema altering statements are not 
calling
-         * Keyspaces.validate() as of yet. TODO: remove this once they do (on 
CASSANDRA-9425 completion)
-         */
-        Set<String> indexNames = new HashSet<>();
-        for (IndexMetadata index : indexesByName.values())
-        {
-            if (indexNames.contains(index.name))
-                throw new ConfigurationException(format("Duplicate index name 
%s for table %s", index.name, table));
-
-            indexNames.add(index.name);
-        }
-
         indexesByName.values().forEach(i -> i.validate(table));
     }
 
@@ -197,20 +181,6 @@ public final class Indexes implements 
Iterable<IndexMetadata>
         return indexesByName.values().toString();
     }
 
-    public static String getAvailableIndexName(String ksName, String cfName, 
String indexNameRoot)
-    {
-
-        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
-        Set<String> existingNames = ksm == null ? new HashSet<>() : 
ksm.existingIndexNames(null);
-        String baseName = IndexMetadata.getDefaultIndexName(cfName, 
indexNameRoot);
-        String acceptedName = baseName;
-        int i = 0;
-        while (existingNames.contains(acceptedName))
-            acceptedName = baseName + '_' + (++i);
-
-        return acceptedName;
-    }
-
     public static final class Builder
     {
         final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new 
ImmutableMap.Builder<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java 
b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 80a3869..059a8f8 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -27,10 +27,22 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.schema.Functions.FunctionsDiff;
+import org.apache.cassandra.schema.Tables.TablesDiff;
+import org.apache.cassandra.schema.Types.TypesDiff;
+import org.apache.cassandra.schema.Views.ViewsDiff;
+import org.apache.cassandra.service.StorageService;
 
 import static java.lang.String.format;
 
+import static com.google.common.collect.Iterables.any;
+
 /**
  * An immutable representation of keyspace metadata (name, params, tables, 
types, and functions).
  */
@@ -93,6 +105,20 @@ public final class KeyspaceMetadata
         return new KeyspaceMetadata(name, params, tables, views, types, 
functions);
     }
 
+    /**
+     * Returns a new KeyspaceMetadata with all instances of old UDT replaced 
with the updated version.
+     * Replaces all instances in tables, views, types, and functions.
+     */
+    public KeyspaceMetadata withUpdatedUserType(UserType udt)
+    {
+        return new KeyspaceMetadata(name,
+                                    params,
+                                    tables.withUpdatedUserType(udt),
+                                    views.withUpdatedUserTypes(udt),
+                                    types.withUpdatedUserType(udt),
+                                    functions.withUpdatedUserType(udt));
+    }
+
     public Iterable<TableMetadata> tablesAndViews()
     {
         return Iterables.concat(tables, views.metadatas());
@@ -107,14 +133,34 @@ public final class KeyspaceMetadata
              : view.metadata;
     }
 
-    public Set<String> existingIndexNames(String cfToExclude)
+    public boolean hasTable(String tableName)
     {
-        Set<String> indexNames = new HashSet<>();
-        for (TableMetadata table : tables)
-            if (cfToExclude == null || !table.name.equals(cfToExclude))
-                for (IndexMetadata index : table.indexes)
-                    indexNames.add(index.name);
-        return indexNames;
+        return tables.get(tableName).isPresent();
+    }
+
+    public boolean hasView(String viewName)
+    {
+        return views.get(viewName).isPresent();
+    }
+
+    public boolean hasIndex(String indexName)
+    {
+        return any(tables, t -> t.indexes.has(indexName));
+    }
+
+    public String findAvailableIndexName(String baseName)
+    {
+        if (!hasIndex(baseName))
+            return baseName;
+
+        int i = 1;
+        do
+        {
+            String name = baseName + '_' + i++;
+            if (!hasIndex(name))
+                return name;
+        }
+        while (true);
     }
 
     public Optional<TableMetadata> findIndexedTable(String indexName)
@@ -190,4 +236,77 @@ public final class KeyspaceMetadata
             }
         }
     }
+
+    public AbstractReplicationStrategy createReplicationStrategy()
+    {
+        return AbstractReplicationStrategy.createReplicationStrategy(name,
+                                                                     
params.replication.klass,
+                                                                     
StorageService.instance.getTokenMetadata(),
+                                                                     
DatabaseDescriptor.getEndpointSnitch(),
+                                                                     
params.replication.options);
+    }
+
+    static Optional<KeyspaceDiff> diff(KeyspaceMetadata before, 
KeyspaceMetadata after, Diff.Mode mode)
+    {
+        return KeyspaceDiff.diff(before, after, mode);
+    }
+
+    public static final class KeyspaceDiff
+    {
+        public final KeyspaceMetadata before;
+        public final KeyspaceMetadata after;
+
+        public final TablesDiff tables;
+        public final ViewsDiff views;
+        public final TypesDiff types;
+
+        public final FunctionsDiff<UDFunction> udfs;
+        public final FunctionsDiff<UDAggregate> udas;
+
+        private KeyspaceDiff(KeyspaceMetadata before,
+                             KeyspaceMetadata after,
+                             TablesDiff tables,
+                             ViewsDiff views,
+                             TypesDiff types,
+                             FunctionsDiff<UDFunction> udfs,
+                             FunctionsDiff<UDAggregate> udas)
+        {
+            this.before = before;
+            this.after = after;
+            this.tables = tables;
+            this.views = views;
+            this.types = types;
+            this.udfs = udfs;
+            this.udas = udas;
+        }
+
+        private static Optional<KeyspaceDiff> diff(KeyspaceMetadata before, 
KeyspaceMetadata after, Diff.Mode mode)
+        {
+            if (before == after)
+                return Optional.empty();
+
+            if (!before.name.equals(after.name))
+            {
+                String msg = String.format("Attempting to diff two keyspaces 
with different names ('%s' and '%s')", before.name, after.name);
+                throw new IllegalArgumentException(msg);
+            }
+
+            TablesDiff tables = Tables.diff(before.tables, after.tables, mode);
+            ViewsDiff views = Views.diff(before.views, after.views, mode);
+            TypesDiff types = Types.diff(before.types, after.types, mode);
+
+            @SuppressWarnings("unchecked") FunctionsDiff<UDFunction>  udfs = 
FunctionsDiff.NONE;
+            @SuppressWarnings("unchecked") FunctionsDiff<UDAggregate> udas = 
FunctionsDiff.NONE;
+            if (before.functions != after.functions)
+            {
+                udfs = Functions.udfsDiff(before.functions, after.functions, 
mode);
+                udas = Functions.udasDiff(before.functions, after.functions, 
mode);
+            }
+
+            if (before.params.equals(after.params) && tables.isEmpty() && 
views.isEmpty() && types.isEmpty() && udfs.isEmpty() && udas.isEmpty())
+                return Optional.empty();
+
+            return Optional.of(new KeyspaceDiff(before, after, tables, views, 
types, udfs, udas));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/KeyspaceParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java 
b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
index 1deaa29..68ac5e4 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceParams.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
@@ -31,8 +31,8 @@ public final class KeyspaceParams
     public static final boolean DEFAULT_DURABLE_WRITES = true;
 
     /**
-     * This determines durable writes for the {@link 
org.apache.cassandra.config.SchemaConstants#SCHEMA_KEYSPACE_NAME}
-     * and {@link 
org.apache.cassandra.config.SchemaConstants#SYSTEM_KEYSPACE_NAME} keyspaces,
+     * This determines durable writes for the {@link 
org.apache.cassandra.schema.SchemaConstants#SCHEMA_KEYSPACE_NAME}
+     * and {@link 
org.apache.cassandra.schema.SchemaConstants#SYSTEM_KEYSPACE_NAME} keyspaces,
      * the only reason it is not final is for commitlog unit tests. It should 
only be changed for testing purposes.
      */
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/Keyspaces.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java 
b/src/java/org/apache/cassandra/schema/Keyspaces.java
index 1692f88..3a5d9a3 100644
--- a/src/java/org/apache/cassandra/schema/Keyspaces.java
+++ b/src/java/org/apache/cassandra/schema/Keyspaces.java
@@ -18,18 +18,21 @@
 package org.apache.cassandra.schema;
 
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
 
 public final class Keyspaces implements Iterable<KeyspaceMetadata>
 {
+    private static final Keyspaces NONE = builder().build();
+
     private final ImmutableMap<String, KeyspaceMetadata> keyspaces;
     private final ImmutableMap<TableId, TableMetadata> tables;
 
@@ -46,7 +49,7 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
 
     public static Keyspaces none()
     {
-        return builder().build();
+        return NONE;
     }
 
     public static Keyspaces of(KeyspaceMetadata... keyspaces)
@@ -69,18 +72,39 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
         return keyspaces.keySet();
     }
 
+    /**
+     * Get the keyspace with the specified name
+     *
+     * @param name a non-qualified keyspace name
+     * @return an empty {@link Optional} if the table name is not found; a 
non-empty optional of {@link KeyspaceMetadata} otherwise
+     */
+    public Optional<KeyspaceMetadata> get(String name)
+    {
+        return Optional.ofNullable(keyspaces.get(name));
+    }
+
     @Nullable
     public KeyspaceMetadata getNullable(String name)
     {
         return keyspaces.get(name);
     }
 
+    public boolean containsKeyspace(String name)
+    {
+        return keyspaces.containsKey(name);
+    }
+
     @Nullable
     public TableMetadata getTableOrViewNullable(TableId id)
     {
         return tables.get(id);
     }
 
+    public boolean isEmpty()
+    {
+        return keyspaces.isEmpty();
+    }
+
     public Keyspaces filter(Predicate<KeyspaceMetadata> predicate)
     {
         Builder builder = builder();
@@ -97,19 +121,19 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
         if (keyspace == null)
             throw new IllegalStateException(String.format("Keyspace %s doesn't 
exists", name));
 
-        return builder().add(filter(k -> k != keyspace)).build();
+        return filter(k -> k != keyspace);
     }
 
     public Keyspaces withAddedOrUpdated(KeyspaceMetadata keyspace)
     {
-        return builder().add(filter(k -> !k.name.equals(keyspace.name)))
+        return builder().add(Iterables.filter(this, k -> 
!k.name.equals(keyspace.name)))
                         .add(keyspace)
                         .build();
     }
 
-    MapDifference<String, KeyspaceMetadata> diff(Keyspaces other)
+    public void validate()
     {
-        return Maps.difference(keyspaces, other.keyspaces);
+        keyspaces.values().forEach(KeyspaceMetadata::validate);
     }
 
     @Override
@@ -167,4 +191,63 @@ public final class Keyspaces implements 
Iterable<KeyspaceMetadata>
             return this;
         }
     }
+
+    /**
+     * Calculates the difference between two schemas.
+     *
+     * Has two modes of operation:
+     *
+     * 1. Mode.ON_DISK - compares schemas as they would be serialized, e.g. 
ignores differences in UserType-s for tables,
+     *    and only takes into account the names of UDTs, as we only store type 
names in system_schema.tables
+     * 2. Mode.IN_MEMORY - compares metadata objects thoroughly, accounting 
for every object in the graph. e.g. two TableMetadata
+     *    objects would be considered different if a UDT they refer to has 
added a new field
+     *
+     * @param before schema before the changes
+     * @param after schema after the changes
+     * @param mode of comparison to make - in memory or on disk representations
+     */
+    static KeyspacesDiff diff(Keyspaces before, Keyspaces after, Diff.Mode 
mode)
+    {
+        return KeyspacesDiff.diff(before, after, mode);
+    }
+
+    public static final class KeyspacesDiff
+    {
+        static final KeyspacesDiff NONE = new KeyspacesDiff(Keyspaces.none(), 
Keyspaces.none(), ImmutableList.of());
+
+        public final Keyspaces created;
+        public final Keyspaces dropped;
+        public final ImmutableList<KeyspaceDiff> altered;
+
+        private KeyspacesDiff(Keyspaces created, Keyspaces dropped, 
ImmutableList<KeyspaceDiff> altered)
+        {
+            this.created = created;
+            this.dropped = dropped;
+            this.altered = altered;
+        }
+
+        private static KeyspacesDiff diff(Keyspaces before, Keyspaces after, 
Diff.Mode mode)
+        {
+            if (before == after)
+                return NONE;
+
+            Keyspaces created = after.filter(k -> 
!before.containsKeyspace(k.name));
+            Keyspaces dropped = before.filter(k -> 
!after.containsKeyspace(k.name));
+
+            ImmutableList.Builder<KeyspaceDiff> altered = 
ImmutableList.builder();
+            before.forEach(keyspaceBefore ->
+            {
+                KeyspaceMetadata keyspaceAfter = 
after.getNullable(keyspaceBefore.name);
+                if (null != keyspaceAfter)
+                    KeyspaceMetadata.diff(keyspaceBefore, keyspaceAfter, 
mode).ifPresent(altered::add);
+            });
+
+            return new KeyspacesDiff(created, dropped, altered.build());
+        }
+
+        public boolean isEmpty()
+        {
+            return created.isEmpty() && dropped.isEmpty() && altered.isEmpty();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aaddbd49/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index 7ad8cad..a9f69f4 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -24,8 +24,10 @@ import java.util.concurrent.*;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 
+import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -34,6 +36,7 @@ import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.CassandraException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -41,6 +44,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -155,6 +159,85 @@ public class MigrationManager
         }
     }
 
+    public static KeyspacesDiff alterSchema(SchemaTransformation 
transformation, long timestamp, boolean locally)
+    {
+        Future<AlterSchemaCallable.Result> diff =
+            StageManager.getStage(Stage.MIGRATION).submit(new 
AlterSchemaCallable(transformation, timestamp, locally));
+
+        AlterSchemaCallable.Result result = Futures.getUnchecked(diff);
+
+        if (null != result.exception)
+            throw result.exception;
+        else
+            return result.diff;
+    }
+
+    private static final class AlterSchemaCallable implements 
Callable<AlterSchemaCallable.Result>
+    {
+        private final SchemaTransformation transformation;
+        private final long timestamp;
+        private final boolean locally;
+
+        private AlterSchemaCallable(SchemaTransformation transformation, long 
timestamp, boolean locally)
+        {
+            this.transformation = transformation;
+            this.timestamp = timestamp;
+            this.locally = locally;
+        }
+
+        public Result call()
+        {
+            KeyspacesDiff diff;
+            try
+            {
+                diff = Schema.instance.apply(transformation);
+            }
+            catch (CassandraException e)
+            {
+                return new Result(e);
+            }
+
+            if (!diff.isEmpty())
+            {
+                Collection<Mutation> schemaMutations = 
SchemaKeyspace.convertSchemaDiffToMutations(diff, timestamp);
+
+                schemaMutations.forEach(Mutation::apply);
+
+                if (!locally)
+                {
+                    Schema.instance.updateVersionAndAnnounce();
+
+                    for (InetAddress endpoint : 
Gossiper.instance.getLiveMembers())
+                    {
+                        // only push schema to nodes with known and equal 
versions
+                        if (!endpoint.equals(FBUtilities.getBroadcastAddress())
+                            && 
MessagingService.instance().knowsVersion(endpoint)
+                            && 
MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version)
+                            pushSchemaMutation(endpoint, schemaMutations);
+                    }
+                }
+            }
+
+            return new Result(diff);
+        }
+
+        static final class Result
+        {
+            KeyspacesDiff diff;
+            CassandraException exception;
+
+            private Result(KeyspacesDiff diff)
+            {
+                this.diff = diff;
+            }
+
+            private Result(CassandraException exception)
+            {
+                this.exception = exception;
+            }
+        }
+    }
+
     public static void announceNewKeyspace(KeyspaceMetadata ksm) throws 
ConfigurationException
     {
         announceNewKeyspace(ksm, false);
@@ -176,14 +259,9 @@ public class MigrationManager
         announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), 
announceLocally);
     }
 
-    public static void announceNewTable(TableMetadata cfm) throws 
ConfigurationException
+    public static void announceNewTable(TableMetadata cfm)
     {
-        announceNewTable(cfm, false);
-    }
-
-    public static void announceNewTable(TableMetadata cfm, boolean 
announceLocally)
-    {
-        announceNewTable(cfm, announceLocally, true);
+        announceNewTable(cfm, true, FBUtilities.timestampMicros());
     }
 
     /**
@@ -198,15 +276,10 @@ public class MigrationManager
      */
     public static void forceAnnounceNewTable(TableMetadata cfm)
     {
-        announceNewTable(cfm, false, false, 0);
-    }
-
-    private static void announceNewTable(TableMetadata cfm, boolean 
announceLocally, boolean throwOnDuplicate)
-    {
-        announceNewTable(cfm, announceLocally, throwOnDuplicate, 
FBUtilities.timestampMicros());
+        announceNewTable(cfm, false, 0);
     }
 
-    private static void announceNewTable(TableMetadata cfm, boolean 
announceLocally, boolean throwOnDuplicate, long timestamp)
+    private static void announceNewTable(TableMetadata cfm, boolean 
throwOnDuplicate, long timestamp)
     {
         cfm.validate();
 
@@ -218,21 +291,7 @@ public class MigrationManager
             throw new AlreadyExistsException(cfm.keyspace, cfm.name);
 
         logger.info("Create new table: {}", cfm);
-        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), 
announceLocally);
-    }
-
-    public static void announceNewView(ViewMetadata view, boolean 
announceLocally) throws ConfigurationException
-    {
-        view.metadata.validate();
-
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace);
-        if (ksm == null)
-            throw new ConfigurationException(String.format("Cannot add table 
'%s' to non existing keyspace '%s'.", view.name, view.keyspace));
-        else if (ksm.getTableOrViewNullable(view.name) != null)
-            throw new AlreadyExistsException(view.keyspace, view.name);
-
-        logger.info("Create new view: {}", view);
-        announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
+        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), 
false);
     }
 
     public static void announceNewType(UserType newType, boolean 
announceLocally)
@@ -255,12 +314,7 @@ public class MigrationManager
         announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
     }
 
-    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm) throws 
ConfigurationException
-    {
-        announceKeyspaceUpdate(ksm, false);
-    }
-
-    public static void announceKeyspaceUpdate(KeyspaceMetadata ksm, boolean 
announceLocally) throws ConfigurationException
+    static void announceKeyspaceUpdate(KeyspaceMetadata ksm)
     {
         ksm.validate();
 
@@ -269,7 +323,7 @@ public class MigrationManager
             throw new ConfigurationException(String.format("Cannot update non 
existing keyspace '%s'.", ksm.name));
 
         logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, 
ksm);
-        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, 
ksm.params, FBUtilities.timestampMicros()), announceLocally);
+        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, 
ksm.params, FBUtilities.timestampMicros()), false);
     }
 
     public static void announceTableUpdate(TableMetadata tm) throws 
ConfigurationException
@@ -296,14 +350,14 @@ public class MigrationManager
     {
         view.metadata.validate();
 
-        ViewMetadata oldView = Schema.instance.getView(view.keyspace, 
view.name);
+        ViewMetadata oldView = Schema.instance.getView(view.keyspace(), 
view.name());
         if (oldView == null)
-            throw new ConfigurationException(String.format("Cannot update non 
existing materialized view '%s' in keyspace '%s'.", view.name, view.keyspace));
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace);
+            throw new ConfigurationException(String.format("Cannot update non 
existing materialized view '%s' in keyspace '%s'.", view.name(), 
view.keyspace()));
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(view.keyspace());
 
         oldView.metadata.validateCompatibility(view.metadata);
 
-        logger.info("Update view '{}/{}' From {} To {}", view.keyspace, 
view.name, oldView, view);
+        logger.info("Update view '{}/{}' From {} To {}", view.keyspace(), 
view.name(), oldView, view);
         announce(SchemaKeyspace.makeUpdateViewMutation(ksm, oldView, view, 
FBUtilities.timestampMicros()), announceLocally);
     }
 
@@ -313,27 +367,17 @@ public class MigrationManager
         announceNewType(updatedType, announceLocally);
     }
 
-    public static void announceKeyspaceDrop(String ksName) throws 
ConfigurationException
-    {
-        announceKeyspaceDrop(ksName, false);
-    }
-
-    public static void announceKeyspaceDrop(String ksName, boolean 
announceLocally) throws ConfigurationException
+    static void announceKeyspaceDrop(String ksName)
     {
         KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName);
         if (oldKsm == null)
             throw new ConfigurationException(String.format("Cannot drop non 
existing keyspace '%s'.", ksName));
 
         logger.info("Drop Keyspace '{}'", oldKsm.name);
-        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, 
FBUtilities.timestampMicros()), announceLocally);
+        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, 
FBUtilities.timestampMicros()), false);
     }
 
-    public static void announceTableDrop(String ksName, String cfName) throws 
ConfigurationException
-    {
-        announceTableDrop(ksName, cfName, false);
-    }
-
-    public static void announceTableDrop(String ksName, String cfName, boolean 
announceLocally) throws ConfigurationException
+    static void announceTableDrop(String ksName, String cfName)
     {
         TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName);
         if (tm == null)
@@ -341,38 +385,7 @@ public class MigrationManager
         KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
 
         logger.info("Drop table '{}/{}'", tm.keyspace, tm.name);
-        announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceViewDrop(String ksName, String viewName, 
boolean announceLocally) throws ConfigurationException
-    {
-        ViewMetadata view = Schema.instance.getView(ksName, viewName);
-        if (view == null)
-            throw new ConfigurationException(String.format("Cannot drop non 
existing materialized view '%s' in keyspace '%s'.", viewName, ksName));
-        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
-
-        logger.info("Drop table '{}/{}'", view.keyspace, view.name);
-        announce(SchemaKeyspace.makeDropViewMutation(ksm, view, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceTypeDrop(UserType droppedType, boolean 
announceLocally)
-    {
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(droppedType.keyspace);
-        announce(SchemaKeyspace.dropTypeFromSchemaMutation(ksm, droppedType, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceFunctionDrop(UDFunction udf, boolean 
announceLocally)
-    {
-        logger.info("Drop scalar function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
-        announce(SchemaKeyspace.makeDropFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
-    public static void announceAggregateDrop(UDAggregate udf, boolean 
announceLocally)
-    {
-        logger.info("Drop aggregate function overload '{}' args '{}'", 
udf.name(), udf.argTypes());
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
-        announce(SchemaKeyspace.makeDropAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
+        announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, 
FBUtilities.timestampMicros()), false);
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to