http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4ff8a23..e4e50a0 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -17,14 +17,12 @@
  */
 package org.apache.cassandra.schema;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
@@ -47,9 +45,14 @@ import 
org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.Pair;
 
+import static java.lang.String.format;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.schema.CQLTypeParser.parse;
 
 /**
  * system_schema.* tables and methods for manipulating them.
@@ -77,7 +80,6 @@ public final class SchemaKeyspace
     public static final String AGGREGATES = "aggregates";
     public static final String INDEXES = "indexes";
 
-
     public static final List<String> ALL =
         ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, 
FUNCTIONS, AGGREGATES, INDEXES);
 
@@ -205,14 +207,13 @@ public final class SchemaKeyspace
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "function_name text,"
-                + "signature frozen<list<text>>,"
-                + "argument_names frozen<list<text>>,"
                 + "argument_types frozen<list<text>>,"
+                + "argument_names frozen<list<text>>,"
                 + "body text,"
                 + "language text,"
                 + "return_type text,"
                 + "called_on_null_input boolean,"
-                + "PRIMARY KEY ((keyspace_name), function_name, signature))");
+                + "PRIMARY KEY ((keyspace_name), function_name, 
argument_types))");
 
     private static final CFMetaData Aggregates =
         compile(AGGREGATES,
@@ -220,14 +221,13 @@ public final class SchemaKeyspace
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "aggregate_name text,"
-                + "signature frozen<list<text>>,"
                 + "argument_types frozen<list<text>>,"
                 + "final_func text,"
                 + "initcond blob,"
                 + "return_type text,"
                 + "state_func text,"
                 + "state_type text,"
-                + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
+                + "PRIMARY KEY ((keyspace_name), aggregate_name, 
argument_types))");
 
     public static final List<CFMetaData> ALL_TABLE_METADATA =
         ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, 
Views, Types, Functions, Aggregates, Indexes);
@@ -267,35 +267,6 @@ public final class SchemaKeyspace
         makeCreateKeyspaceMutation(schema, timestamp + 1).apply();
     }
 
-    public static List<KeyspaceMetadata> readSchemaFromSystemTables()
-    {
-        ReadCommand cmd = getReadCommandForTableSchema(KEYSPACES);
-        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); 
PartitionIterator schema = cmd.executeInternal(orderGroup))
-        {
-            List<KeyspaceMetadata> keyspaces = new ArrayList<>();
-
-            while (schema.hasNext())
-            {
-                try (RowIterator partition = schema.next())
-                {
-                    if 
(isSystemKeyspaceSchemaPartition(partition.partitionKey()))
-                        continue;
-
-                    DecoratedKey key = partition.partitionKey();
-
-                    readSchemaPartitionForKeyspaceAndApply(TYPES, key,
-                        types -> 
readSchemaPartitionForKeyspaceAndApply(TABLES, key,
-                        tables -> 
readSchemaPartitionForKeyspaceAndApply(VIEWS, key,
-                        views -> 
readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
-                        functions -> 
readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key,
-                        aggregates -> 
keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, views, 
types, functions, aggregates))))))
-                    );
-                }
-            }
-            return keyspaces;
-        }
-    }
-
     public static void truncate()
     {
         ALL.forEach(table -> getSchemaCFS(table).truncateBlocking());
@@ -397,336 +368,18 @@ public final class SchemaKeyspace
         }
     }
 
-    private static Map<DecoratedKey, FilteredPartition> 
readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
-    {
-        Map<DecoratedKey, FilteredPartition> schema = new HashMap<>();
-
-        for (String keyspaceName : keyspaceNames)
-        {
-            // We don't to return the RowIterator directly because we should 
guarantee that this iterator
-            // will be closed, and putting it in a Map make that harder/more 
awkward.
-            readSchemaPartitionForKeyspaceAndApply(schemaTableName, 
keyspaceName,
-                partition -> {
-                    if (!partition.isEmpty())
-                        schema.put(partition.partitionKey(), 
FilteredPartition.create(partition));
-                    return null;
-                }
-            );
-        }
-
-        return schema;
-    }
-
     private static ByteBuffer getSchemaKSKey(String ksName)
     {
         return AsciiType.instance.fromString(ksName);
     }
 
-    private static <T> T readSchemaPartitionForKeyspaceAndApply(String 
schemaTableName, String keyspaceName, Function<RowIterator, T> fct)
-    {
-        return readSchemaPartitionForKeyspaceAndApply(schemaTableName, 
getSchemaKSKey(keyspaceName), fct);
-    }
-
-    private static <T> T readSchemaPartitionForKeyspaceAndApply(String 
schemaTableName, ByteBuffer keyspaceKey, Function<RowIterator, T> fct)
-    {
-        ColumnFamilyStore store = getSchemaCFS(schemaTableName);
-        return readSchemaPartitionForKeyspaceAndApply(store, 
store.decorateKey(keyspaceKey), fct);
-    }
-
-    private static <T> T readSchemaPartitionForKeyspaceAndApply(String 
schemaTableName, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
-    {
-        return 
readSchemaPartitionForKeyspaceAndApply(getSchemaCFS(schemaTableName), 
keyspaceKey, fct);
-    }
-
-    private static <T> T 
readSchemaPartitionForKeyspaceAndApply(ColumnFamilyStore store, DecoratedKey 
keyspaceKey, Function<RowIterator, T> fct)
-    {
-        int nowInSec = FBUtilities.nowInSeconds();
-        try (OpOrder.Group op = store.readOrdering.start();
-             RowIterator partition = 
UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata,
 nowInSec, keyspaceKey)
-                                                                               
              .queryMemtableAndDisk(store, op), nowInSec))
-        {
-            return fct.apply(partition);
-        }
-    }
-
-    private static <T> T readSchemaPartitionForTableAndApply(String 
schemaTableName, String keyspaceName, String tableName, Function<RowIterator, 
T> fct)
-    {
-        ColumnFamilyStore store = getSchemaCFS(schemaTableName);
-
-        ClusteringComparator comparator = store.metadata.comparator;
-        Slices slices = Slices.with(comparator, Slice.make(comparator, 
tableName));
-        int nowInSec = FBUtilities.nowInSeconds();
-        try (OpOrder.Group op = store.readOrdering.start();
-             RowIterator partition =  
UnfilteredRowIterators.filter(SinglePartitionReadCommand.create(store.metadata, 
nowInSec, getSchemaKSKey(keyspaceName), slices)
-                                                                               
               .queryMemtableAndDisk(store, op), nowInSec))
-        {
-            return fct.apply(partition);
-        }
-    }
-
     private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey 
partitionKey)
     {
         return 
Schema.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey()));
     }
 
-    /**
-     * Merge remote schema in form of mutations with local and mutate ks/cf 
metadata objects
-     * (which also involves fs operations on add/drop ks/cf)
-     *
-     * @param mutations the schema changes to apply
-     *
-     * @throws ConfigurationException If one of metadata attributes has 
invalid value
-     * @throws IOException If data was corrupted during transportation or 
failed to apply fs operations
-     */
-    public static synchronized void 
mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws 
ConfigurationException, IOException
-    {
-        mergeSchema(mutations);
-        Schema.instance.updateVersionAndAnnounce();
-    }
-
-    public static synchronized void mergeSchema(Collection<Mutation> 
mutations) throws IOException
-    {
-        // compare before/after schemas of the affected keyspaces only
-        Set<String> keyspaces = new HashSet<>(mutations.size());
-        for (Mutation mutation : mutations)
-            keyspaces.add(ByteBufferUtil.string(mutation.key().getKey()));
-
-        // current state of the schema
-        Map<DecoratedKey, FilteredPartition> oldKeyspaces = 
readSchemaForKeyspaces(KEYSPACES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldColumnFamilies = 
readSchemaForKeyspaces(TABLES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldViews = 
readSchemaForKeyspaces(VIEWS, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldTypes = 
readSchemaForKeyspaces(TYPES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldFunctions = 
readSchemaForKeyspaces(FUNCTIONS, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldAggregates = 
readSchemaForKeyspaces(AGGREGATES, keyspaces);
-
-        mutations.forEach(Mutation::apply);
-
-        if (FLUSH_SCHEMA_TABLES)
-            flush();
-
-        // with new data applied
-        Map<DecoratedKey, FilteredPartition> newKeyspaces = 
readSchemaForKeyspaces(KEYSPACES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newColumnFamilies = 
readSchemaForKeyspaces(TABLES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newViews = 
readSchemaForKeyspaces(VIEWS, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newTypes = 
readSchemaForKeyspaces(TYPES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newFunctions = 
readSchemaForKeyspaces(FUNCTIONS, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newAggregates = 
readSchemaForKeyspaces(AGGREGATES, keyspaces);
-
-        Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, 
newKeyspaces);
-        mergeTables(oldColumnFamilies, newColumnFamilies);
-        mergeViews(oldViews, newViews);
-        mergeTypes(oldTypes, newTypes);
-        mergeFunctions(oldFunctions, newFunctions);
-        mergeAggregates(oldAggregates, newAggregates);
-
-        // it is safe to drop a keyspace only when all nested ColumnFamilies 
where deleted
-        keyspacesToDrop.forEach(Schema.instance::dropKeyspace);
-    }
-
-    private static Set<String> mergeKeyspaces(Map<DecoratedKey, 
FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        for (FilteredPartition newPartition : after.values())
-        {
-            String name = 
AsciiType.instance.compose(newPartition.partitionKey().getKey());
-            KeyspaceParams params = 
createKeyspaceParamsFromSchemaPartition(newPartition.rowIterator());
-
-            FilteredPartition oldPartition = 
before.remove(newPartition.partitionKey());
-            if (oldPartition == null || oldPartition.isEmpty())
-                Schema.instance.addKeyspace(KeyspaceMetadata.create(name, 
params));
-            else
-                Schema.instance.updateKeyspace(name, params);
-        }
-
-        // What's remain in old is those keyspace that are not in updated, 
i.e. the dropped ones.
-        return asKeyspaceNamesSet(before.keySet());
-    }
-
-    private static Set<String> asKeyspaceNamesSet(Set<DecoratedKey> keys)
-    {
-        Set<String> names = new HashSet<>(keys.size());
-        for (DecoratedKey key : keys)
-            names.add(AsciiType.instance.compose(key.getKey()));
-        return names;
-    }
-
-    private static void mergeTables(Map<DecoratedKey, FilteredPartition> 
before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                Schema.instance.dropTable(oldRow.getString("keyspace_name"), 
oldRow.getString("table_name"));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                Schema.instance.addTable(createTableFromTableRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, 
UntypedResultSet.Row newRow)
-            {
-                Schema.instance.updateTable(newRow.getString("keyspace_name"), 
newRow.getString("table_name"));
-            }
-        });
-    }
-
-    private static void mergeViews(Map<DecoratedKey, FilteredPartition> 
before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                Schema.instance.dropView(oldRow.getString("keyspace_name"), 
oldRow.getString("view_name"));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                Schema.instance.addView(createViewFromViewRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, 
UntypedResultSet.Row newRow)
-            {
-                Schema.instance.updateView(newRow.getString("keyspace_name"), 
newRow.getString("view_name"));
-            }
-        });
-    }
-
-    private static void mergeTypes(Map<DecoratedKey, FilteredPartition> 
before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                Schema.instance.dropType(createTypeFromRow(oldRow));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                Schema.instance.addType(createTypeFromRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, 
UntypedResultSet.Row newRow)
-            {
-                Schema.instance.updateType(createTypeFromRow(newRow));
-            }
-        });
-    }
-
-    private static void mergeFunctions(Map<DecoratedKey, FilteredPartition> 
before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                
Schema.instance.dropFunction(createFunctionFromFunctionRow(oldRow));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                
Schema.instance.addFunction(createFunctionFromFunctionRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, 
UntypedResultSet.Row newRow)
-            {
-                
Schema.instance.updateFunction(createFunctionFromFunctionRow(newRow));
-            }
-        });
-    }
-
-    private static void mergeAggregates(Map<DecoratedKey, FilteredPartition> 
before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                
Schema.instance.dropAggregate(createAggregateFromAggregateRow(oldRow));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                
Schema.instance.addAggregate(createAggregateFromAggregateRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, 
UntypedResultSet.Row newRow)
-            {
-                
Schema.instance.updateAggregate(createAggregateFromAggregateRow(newRow));
-            }
-        });
-    }
-
-    public interface Differ
-    {
-        void onDropped(UntypedResultSet.Row oldRow);
-        void onAdded(UntypedResultSet.Row newRow);
-        void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row 
newRow);
-    }
-
-    private static void diffSchema(Map<DecoratedKey, FilteredPartition> 
before, Map<DecoratedKey, FilteredPartition> after, Differ differ)
-    {
-        for (FilteredPartition newPartition : after.values())
-        {
-            CFMetaData metadata = newPartition.metadata();
-            DecoratedKey key = newPartition.partitionKey();
-
-            FilteredPartition oldPartition = before.remove(key);
-
-            if (oldPartition == null || oldPartition.isEmpty())
-            {
-                // Means everything is to be added
-                for (Row row : newPartition)
-                    
differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, row));
-                continue;
-            }
-
-            Iterator<Row> oldIter = oldPartition.iterator();
-            Iterator<Row> newIter = newPartition.iterator();
-
-            Row oldRow = oldIter.hasNext() ? oldIter.next() : null;
-            Row newRow = newIter.hasNext() ? newIter.next() : null;
-            while (oldRow != null && newRow != null)
-            {
-                int cmp = metadata.comparator.compare(oldRow.clustering(), 
newRow.clustering());
-                if (cmp < 0)
-                {
-                    
differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
-                    oldRow = oldIter.hasNext() ? oldIter.next() : null;
-                }
-                else if (cmp > 0)
-                {
-
-                    
differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
-                    newRow = newIter.hasNext() ? newIter.next() : null;
-                }
-                else
-                {
-                    if (!oldRow.equals(newRow))
-                        
differ.onUpdated(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow), 
UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
-
-                    oldRow = oldIter.hasNext() ? oldIter.next() : null;
-                    newRow = newIter.hasNext() ? newIter.next() : null;
-                }
-            }
-
-            while (oldRow != null)
-            {
-                
differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
-                oldRow = oldIter.hasNext() ? oldIter.next() : null;
-            }
-            while (newRow != null)
-            {
-                differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, 
key, newRow));
-                newRow = newIter.hasNext() ? newIter.next() : null;
-            }
-        }
-
-        // What remains is those keys that were only in before.
-        for (FilteredPartition partition : before.values())
-            for (Row row : partition)
-                
differ.onDropped(UntypedResultSet.Row.fromInternalRow(partition.metadata(), 
partition.partitionKey(), row));
-    }
-
     /*
-     * Keyspace metadata serialization/deserialization.
+     * Schema entities to mutations
      */
 
     public static Mutation makeCreateKeyspaceMutation(String name, 
KeyspaceParams params, long timestamp)
@@ -761,46 +414,6 @@ public final class SchemaKeyspace
         return mutation;
     }
 
-    private static KeyspaceMetadata 
createKeyspaceFromSchemaPartitions(RowIterator serializedParams,
-                                                                       
RowIterator serializedTables,
-                                                                       
RowIterator serializedViews,
-                                                                       
RowIterator serializedTypes,
-                                                                       
RowIterator serializedFunctions,
-                                                                       
RowIterator serializedAggregates)
-    {
-        String name = 
AsciiType.instance.compose(serializedParams.partitionKey().getKey());
-
-        KeyspaceParams params = 
createKeyspaceParamsFromSchemaPartition(serializedParams);
-        Tables tables = createTablesFromTablesPartition(serializedTables);
-        Views views = createViewsFromViewsPartition(serializedViews);
-        Types types = createTypesFromPartition(serializedTypes);
-
-        Collection<UDFunction> udfs = 
createFunctionsFromFunctionsPartition(serializedFunctions);
-        Functions functions = 
org.apache.cassandra.schema.Functions.builder().add(udfs).build();
-        functions = createAggregatesFromAggregatesPartition(functions, 
serializedAggregates);
-
-        return KeyspaceMetadata.create(name, params, tables, views, types, 
functions);
-    }
-
-    /**
-     * Deserialize only Keyspace attributes without nested tables or types
-     *
-     * @param partition Keyspace attributes in serialized form
-     */
-
-    private static KeyspaceParams 
createKeyspaceParamsFromSchemaPartition(RowIterator partition)
-    {
-        String query = String.format("SELECT * FROM %s.%s", NAME, KEYSPACES);
-        UntypedResultSet.Row row = QueryProcessor.resultify(query, 
partition).one();
-
-        return 
KeyspaceParams.create(row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()),
-                                     
row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString()));
-    }
-
-    /*
-     * User type metadata serialization/deserialization.
-     */
-
     public static Mutation makeCreateTypeMutation(KeyspaceMetadata keyspace, 
UserType type, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -813,8 +426,8 @@ public final class SchemaKeyspace
     {
         RowUpdateBuilder adder = new RowUpdateBuilder(Types, timestamp, 
mutation)
                                  .clustering(type.getNameAsString())
-                                 .frozenList("field_names", 
type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(Collectors.toList()))
-                                 .frozenList("field_types", 
type.fieldTypes().stream().map(AbstractType::toString).collect(Collectors.toList()));
+                                 .frozenList("field_names", 
type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(toList()))
+                                 .frozenList("field_types", 
type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList()));
 
         adder.build();
     }
@@ -838,36 +451,6 @@ public final class SchemaKeyspace
         return RowUpdateBuilder.deleteRow(Types, timestamp, mutation, 
type.name);
     }
 
-    private static Types createTypesFromPartition(RowIterator partition)
-    {
-        String query = String.format("SELECT * FROM %s.%s", NAME, TYPES);
-        Types.Builder types = org.apache.cassandra.schema.Types.builder();
-        QueryProcessor.resultify(query, partition).forEach(row -> 
types.add(createTypeFromRow(row)));
-        return types.build();
-    }
-
-    private static UserType createTypeFromRow(UntypedResultSet.Row row)
-    {
-        String keyspace = row.getString("keyspace_name");
-        ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
-        List<String> rawColumns = row.getFrozenList("field_names", 
UTF8Type.instance);
-        List<String> rawTypes = row.getFrozenList("field_types", 
UTF8Type.instance);
-
-        List<ByteBuffer> columns = new ArrayList<>(rawColumns.size());
-        for (String rawColumn : rawColumns)
-            columns.add(ByteBufferUtil.bytes(rawColumn));
-
-        List<AbstractType<?>> types = new ArrayList<>(rawTypes.size());
-        for (String rawType : rawTypes)
-            types.add(parseType(rawType));
-
-        return new UserType(keyspace, name, columns, types);
-    }
-
-    /*
-     * Table metadata serialization/deserialization.
-     */
-
     public static Mutation makeCreateTableMutation(KeyspaceMetadata keyspace, 
CFMetaData table, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -990,9 +573,7 @@ public final class SchemaKeyspace
 
         // updated indexes need to be updated
         for (MapDifference.ValueDifference<IndexMetadata> diff : 
indexesDiff.entriesDiffering().values())
-        {
             addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), 
timestamp, mutation);
-        }
 
         return mutation;
     }
@@ -1038,274 +619,59 @@ public final class SchemaKeyspace
         return mutation;
     }
 
-    public static CFMetaData createTableFromName(String keyspace, String table)
+    private static void addColumnToSchemaMutation(CFMetaData table, 
ColumnDefinition column, long timestamp, Mutation mutation)
     {
-        return readSchemaPartitionForTableAndApply(TABLES, keyspace, table, 
partition ->
-        {
-            if (partition.isEmpty())
-                throw new RuntimeException(String.format("%s:%s not found in 
the schema definitions keyspace.", keyspace, table));
+        RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, 
mutation).clustering(table.cfName, column.name.toString());
+
+        AbstractType<?> type = column.type;
+        if (type instanceof ReversedType)
+            type = ((ReversedType) type).baseType;
 
-            return createTableFromTablePartition(partition);
-        });
+        adder.add("column_name_bytes", column.name.bytes)
+             .add("kind", column.kind.toString().toLowerCase())
+             .add("position", column.isOnAllComponents() ? 
ColumnDefinition.NO_POSITION : column.position())
+             .add("clustering_order", 
column.clusteringOrder().toString().toLowerCase())
+             .add("type", type.asCQL3Type().toString())
+             .build();
     }
 
-    /**
-     * Deserialize tables from low-level schema representation, all of them 
belong to the same keyspace
-     */
-    private static Tables createTablesFromTablesPartition(RowIterator 
partition)
+    private static void dropColumnFromSchemaMutation(CFMetaData table, 
ColumnDefinition column, long timestamp, Mutation mutation)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
-        Tables.Builder tables = org.apache.cassandra.schema.Tables.builder();
-        QueryProcessor.resultify(query, partition).forEach(row -> 
tables.add(createTableFromTableRow(row)));
-        return tables.build();
+        // Note: we do want to use name.toString(), not name.bytes directly 
for backward compatibility (For CQL3, this won't make a difference).
+        RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, 
column.name.toString());
     }
 
-    private static List<ColumnDefinition> 
createColumnsFromColumnsPartition(RowIterator serializedColumns)
+    private static void addDroppedColumnToSchemaMutation(CFMetaData table, 
CFMetaData.DroppedColumn column, long timestamp, Mutation mutation)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, COLUMNS);
-        return createColumnsFromColumnRows(QueryProcessor.resultify(query, 
serializedColumns));
+        RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, 
timestamp, mutation).clustering(table.cfName, column.name);
+
+        adder.add("dropped_time", new 
Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime)))
+             .add("type", expandUserTypes(column.type).asCQL3Type().toString())
+             .build();
     }
 
-    private static CFMetaData createTableFromTablePartition(RowIterator 
partition)
+    private static void addTriggerToSchemaMutation(CFMetaData table, 
TriggerMetadata trigger, long timestamp, Mutation mutation)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
-        return createTableFromTableRow(QueryProcessor.resultify(query, 
partition).one());
+        new RowUpdateBuilder(Triggers, timestamp, mutation)
+            .clustering(table.cfName, trigger.name)
+            .frozenMap("options", Collections.singletonMap("class", 
trigger.classOption))
+            .build();
     }
 
-    public static CFMetaData 
createTableFromTablePartitionAndColumnsPartition(RowIterator tablePartition,
-                                                                              
RowIterator columnsPartition)
+    private static void dropTriggerFromSchemaMutation(CFMetaData table, 
TriggerMetadata trigger, long timestamp, Mutation mutation)
     {
-        List<ColumnDefinition> columns = 
createColumnsFromColumnsPartition(columnsPartition);
-        String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
-        return 
createTableFromTableRowAndColumns(QueryProcessor.resultify(query, 
tablePartition).one(), columns);
+        RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, 
table.cfName, trigger.name);
     }
 
-    /**
-     * Deserialize table metadata from low-level representation
-     *
-     * @return Metadata deserialized from schema
-     */
-    private static CFMetaData createTableFromTableRow(UntypedResultSet.Row row)
+    public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, 
ViewDefinition view, long timestamp)
     {
-        String keyspace = row.getString("keyspace_name");
-        String table = row.getString("table_name");
-
-        List<ColumnDefinition> columns =
-            readSchemaPartitionForTableAndApply(COLUMNS, keyspace, table, 
SchemaKeyspace::createColumnsFromColumnsPartition);
-
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns =
-            readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, 
table, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition);
-
-        Triggers triggers =
-            readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, 
SchemaKeyspace::createTriggersFromTriggersPartition);
-
-        CFMetaData cfm = createTableFromTableRowAndColumns(row, 
columns).droppedColumns(droppedColumns)
-                                                                        
.triggers(triggers);
-
-        // the CFMetaData itself is required to build the collection of 
indexes as
-        // the column definitions are needed because we store only the name 
each
-        // index's target columns and this is not enough to reconstruct a 
ColumnIdentifier
-        org.apache.cassandra.schema.Indexes indexes =
-            readSchemaPartitionForTableAndApply(INDEXES, keyspace, table, 
rowIterator -> createIndexesFromIndexesPartition(cfm, rowIterator));
-        cfm.indexes(indexes);
-
-        return cfm;
+        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
+        addViewToSchemaMutation(view, timestamp, true, mutation);
+        return mutation;
     }
 
-    public static CFMetaData 
createTableFromTableRowAndColumns(UntypedResultSet.Row row, 
List<ColumnDefinition> columns)
-    {
-        String keyspace = row.getString("keyspace_name");
-        String table = row.getString("table_name");
-        UUID id = row.getUUID("id");
-
-        Set<CFMetaData.Flag> flags = row.has("flags")
-                                   ? 
CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance))
-                                   : Collections.emptySet();
-
-        boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
-        boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
-        boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
-        boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
-
-        return CFMetaData.create(keyspace,
-                                 table,
-                                 id,
-                                 isDense,
-                                 isCompound,
-                                 isSuper,
-                                 isCounter,
-                                 false,
-                                 columns,
-                                 DatabaseDescriptor.getPartitioner())
-                         .params(createTableParamsFromRow(row));
-    }
-
-    private static TableParams createTableParamsFromRow(UntypedResultSet.Row 
row)
-    {
-        TableParams.Builder builder = TableParams.builder();
-
-        builder.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
-               .caching(CachingParams.fromMap(row.getFrozenTextMap("caching")))
-               .comment(row.getString("comment"))
-               
.compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction")))
-               
.compression(CompressionParams.fromMap(row.getFrozenTextMap("compression")))
-               
.dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
-               .defaultTimeToLive(row.getInt("default_time_to_live"))
-               .gcGraceSeconds(row.getInt("gc_grace_seconds"))
-               .maxIndexInterval(row.getInt("max_index_interval"))
-               
.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
-               .minIndexInterval(row.getInt("min_index_interval"))
-               .readRepairChance(row.getDouble("read_repair_chance"))
-               .crcCheckChance(row.getDouble("crc_check_chance"))
-               
.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
-
-        if (row.has("extensions"))
-            builder.extensions(row.getFrozenMap("extensions", 
UTF8Type.instance, BytesType.instance));
-
-        return builder.build();
-    }
-
-    /*
-     * Column metadata serialization/deserialization.
-     */
-
-    private static void addColumnToSchemaMutation(CFMetaData table, 
ColumnDefinition column, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, 
mutation).clustering(table.cfName, column.name.toString());
-
-        AbstractType<?> type = column.type;
-        if (type instanceof ReversedType)
-            type = ((ReversedType) type).baseType;
-
-        adder.add("column_name_bytes", column.name.bytes)
-             .add("kind", column.kind.toString().toLowerCase())
-             .add("position", column.isOnAllComponents() ? 
ColumnDefinition.NO_POSITION : column.position())
-             .add("clustering_order", 
column.clusteringOrder().toString().toLowerCase())
-             .add("type", type.toString())
-             .build();
-    }
-
-    private static void dropColumnFromSchemaMutation(CFMetaData table, 
ColumnDefinition column, long timestamp, Mutation mutation)
-    {
-        // Note: we do want to use name.toString(), not name.bytes directly 
for backward compatibility (For CQL3, this won't make a difference).
-        RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, 
column.name.toString());
-    }
-
-    private static List<ColumnDefinition> 
createColumnsFromColumnRows(UntypedResultSet rows)
-{
-        List<ColumnDefinition> columns = new ArrayList<>(rows.size());
-        rows.forEach(row -> columns.add(createColumnFromColumnRow(row)));
-        return columns;
-    }
-
-    private static ColumnDefinition 
createColumnFromColumnRow(UntypedResultSet.Row row)
-    {
-        String keyspace = row.getString("keyspace_name");
-        String table = row.getString("table_name");
-
-        ColumnIdentifier name = 
ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), 
row.getString("column_name"));
-
-        ColumnDefinition.Kind kind = 
ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase());
-
-        int position = row.getInt("position");
-        ClusteringOrder order = 
ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
-
-        AbstractType<?> type = parseType(row.getString("type"));
-        if (order == ClusteringOrder.DESC)
-            type = ReversedType.getInstance(type);
-
-        return new ColumnDefinition(keyspace, table, name, type, position, 
kind);
-    }
-
-    /*
-     * Dropped column metadata serialization/deserialization.
-     */
-
-    private static void addDroppedColumnToSchemaMutation(CFMetaData table, 
CFMetaData.DroppedColumn column, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, 
timestamp, mutation).clustering(table.cfName, column.name);
-
-        adder.add("dropped_time", new 
Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime)))
-             .add("type", column.type.toString())
-             .build();
-    }
-
-    private static Map<ByteBuffer, CFMetaData.DroppedColumn> 
createDroppedColumnsFromDroppedColumnsPartition(RowIterator serializedColumns)
-    {
-        String query = String.format("SELECT * FROM %s.%s", NAME, 
DROPPED_COLUMNS);
-        Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
-        for (CFMetaData.DroppedColumn column : 
createDroppedColumnsFromDroppedColumnRows(QueryProcessor.resultify(query, 
serializedColumns)))
-            columns.put(UTF8Type.instance.decompose(column.name), column);
-        return columns;
-    }
-
-    private static List<CFMetaData.DroppedColumn> 
createDroppedColumnsFromDroppedColumnRows(UntypedResultSet rows)
-    {
-        List<CFMetaData.DroppedColumn> columns = new ArrayList<>(rows.size());
-        rows.forEach(row -> 
columns.add(createDroppedColumnFromDroppedColumnRow(row)));
-        return columns;
-    }
-
-    private static CFMetaData.DroppedColumn 
createDroppedColumnFromDroppedColumnRow(UntypedResultSet.Row row)
-    {
-        String name = row.getString("column_name");
-        AbstractType<?> type = TypeParser.parse(row.getString("type"));
-        long droppedTime = 
TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
-
-        return new CFMetaData.DroppedColumn(name, type, droppedTime);
-    }
-
-    /*
-     * Trigger metadata serialization/deserialization.
-     */
-
-    private static void addTriggerToSchemaMutation(CFMetaData table, 
TriggerMetadata trigger, long timestamp, Mutation mutation)
-    {
-        new RowUpdateBuilder(Triggers, timestamp, mutation)
-            .clustering(table.cfName, trigger.name)
-            .frozenMap("options", Collections.singletonMap("class", 
trigger.classOption))
-            .build();
-    }
-
-    private static void dropTriggerFromSchemaMutation(CFMetaData table, 
TriggerMetadata trigger, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, 
table.cfName, trigger.name);
-    }
-
-    /**
-     * Deserialize triggers from storage-level representation.
-     *
-     * @param partition storage-level partition containing the trigger 
definitions
-     * @return the list of processed TriggerDefinitions
-     */
-    private static Triggers createTriggersFromTriggersPartition(RowIterator 
partition)
-    {
-        Triggers.Builder triggers = 
org.apache.cassandra.schema.Triggers.builder();
-        String query = String.format("SELECT * FROM %s.%s", NAME, TRIGGERS);
-        QueryProcessor.resultify(query, partition).forEach(row -> 
triggers.add(createTriggerFromTriggerRow(row)));
-        return triggers.build();
-    }
-
-    private static TriggerMetadata 
createTriggerFromTriggerRow(UntypedResultSet.Row row)
-    {
-        String name = row.getString("trigger_name");
-        String classOption = row.getFrozenTextMap("options").get("class");
-        return new TriggerMetadata(name, classOption);
-    }
-
-    /*
-     * View metadata serialization/deserialization.
-     */
-
-    public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, 
ViewDefinition view, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
-        addViewToSchemaMutation(view, timestamp, true, mutation);
-        return mutation;
-    }
-
-    private static void addViewToSchemaMutation(ViewDefinition view, long 
timestamp, boolean includeColumns, Mutation mutation)
+    private static void addViewToSchemaMutation(ViewDefinition view, long 
timestamp, boolean includeColumns, Mutation mutation)
     {
         RowUpdateBuilder builder = new RowUpdateBuilder(Views, timestamp, 
mutation)
             .clustering(view.viewName);
@@ -1363,9 +729,7 @@ public final class SchemaKeyspace
 
         // columns that are no longer needed
         for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values())
-        {
             dropColumnFromSchemaMutation(oldView.metadata, column, timestamp, 
mutation);
-        }
 
         // newly added columns
         for (ColumnDefinition column : 
columnDiff.entriesOnlyOnRight().values())
@@ -1390,122 +754,303 @@ public final class SchemaKeyspace
         return mutation;
     }
 
-    public static ViewDefinition createViewFromName(String keyspace, String 
view)
+    private static void addIndexToSchemaMutation(CFMetaData table,
+                                                 IndexMetadata index,
+                                                 long timestamp,
+                                                 Mutation mutation)
     {
-        return readSchemaPartitionForTableAndApply(VIEWS, keyspace, view, 
partition ->
-        {
-            if (partition.isEmpty())
-                throw new RuntimeException(String.format("%s:%s not found in 
the schema definitions keyspace.", keyspace, view));
+        RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, 
mutation).clustering(table.cfName, index.name);
 
-            return createViewFromViewPartition(partition);
-        });
+        builder.add("kind", index.kind.toString());
+        builder.frozenMap("options", index.options);
+        builder.build();
+    }
+
+    private static void dropIndexFromSchemaMutation(CFMetaData table,
+                                                    IndexMetadata index,
+                                                    long timestamp,
+                                                    Mutation mutation)
+    {
+        RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, 
index.name);
     }
 
-    private static ViewDefinition createViewFromViewPartition(RowIterator 
partition)
+    private static void addUpdatedIndexToSchemaMutation(CFMetaData table,
+                                                        IndexMetadata index,
+                                                        long timestamp,
+                                                        Mutation mutation)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
-        return createViewFromViewRow(QueryProcessor.resultify(query, 
partition).one());
+        addIndexToSchemaMutation(table, index, timestamp, mutation);
     }
 
-    /**
-     * Deserialize views from storage-level representation.
-     *
-     * @param partition storage-level partition containing the view definitions
-     * @return the list of processed ViewDefinitions
+    public static Mutation makeCreateFunctionMutation(KeyspaceMetadata 
keyspace, UDFunction function, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
+        addFunctionToSchemaMutation(function, timestamp, mutation);
+        return mutation;
+    }
+
+    static void addFunctionToSchemaMutation(UDFunction function, long 
timestamp, Mutation mutation)
+    {
+        RowUpdateBuilder adder =
+            new RowUpdateBuilder(Functions, timestamp, 
mutation).clustering(function.name().name, functionArgumentsList(function));
+
+        adder.add("body", function.body())
+             .add("language", function.language())
+             .add("return_type", function.returnType().asCQL3Type().toString())
+             .add("called_on_null_input", function.isCalledOnNullInput())
+             .frozenList("argument_names", 
function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(toList()));
+
+        adder.build();
+    }
+
+    private static List<String> functionArgumentsList(AbstractFunction fun)
+    {
+        return fun.argTypes()
+                  .stream()
+                  .map(AbstractType::asCQL3Type)
+                  .map(CQL3Type::toString)
+                  .collect(toList());
+    }
+
+    public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, 
UDFunction function, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
+        return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, 
function.name().name, functionArgumentsList(function));
+    }
+
+    public static Mutation makeCreateAggregateMutation(KeyspaceMetadata 
keyspace, UDAggregate aggregate, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
+        addAggregateToSchemaMutation(aggregate, timestamp, mutation);
+        return mutation;
+    }
+
+    static void addAggregateToSchemaMutation(UDAggregate aggregate, long 
timestamp, Mutation mutation)
+    {
+        RowUpdateBuilder adder =
+            new RowUpdateBuilder(Aggregates, timestamp, mutation) 
.clustering(aggregate.name().name, functionArgumentsList(aggregate));
+
+        adder.add("return_type", 
aggregate.returnType().asCQL3Type().toString())
+             .add("state_func", aggregate.stateFunction().name().name)
+             .add("state_type", aggregate.stateType() != null ? 
aggregate.stateType().asCQL3Type().toString() : null)
+             .add("final_func", aggregate.finalFunction() != null ? 
aggregate.finalFunction().name().name : null)
+             .add("initcond", aggregate.initialCondition())
+             .build();
+    }
+
+    public static Mutation makeDropAggregateMutation(KeyspaceMetadata 
keyspace, UDAggregate aggregate, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
+        return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, 
aggregate.name().name, functionArgumentsList(aggregate));
+    }
+
+    /*
+     * Fetching schema
      */
-    private static Views createViewsFromViewsPartition(RowIterator partition)
+
+    public static Keyspaces fetchNonSystemKeyspaces()
     {
-        Views.Builder views = org.apache.cassandra.schema.Views.builder();
-        String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, 
partition))
+        return fetchKeyspacesWithout(Schema.SYSTEM_KEYSPACE_NAMES);
+    }
+
+    private static Keyspaces fetchKeyspacesWithout(Set<String> 
excludedKeyspaceNames)
+    {
+        String query = format("SELECT keyspace_name FROM %s.%s", NAME, 
KEYSPACES);
+
+        Keyspaces.Builder keyspaces = 
org.apache.cassandra.schema.Keyspaces.builder();
+        for (UntypedResultSet.Row row : query(query))
         {
-            ViewDefinition view = createViewFromViewRow(row);
-            views.add(view);
+            String keyspaceName = row.getString("keyspace_name");
+            if (!excludedKeyspaceNames.contains(keyspaceName))
+                keyspaces.add(fetchKeyspace(keyspaceName));
         }
-        return views.build();
+        return keyspaces.build();
     }
 
-    private static ViewDefinition createViewFromViewRow(UntypedResultSet.Row 
row)
+    private static Keyspaces fetchKeyspacesOnly(Set<String> 
includedKeyspaceNames)
     {
-        String keyspace = row.getString("keyspace_name");
-        String view = row.getString("view_name");
-        UUID id = row.getUUID("id");
-        UUID baseTableId = row.getUUID("base_table_id");
-        String baseTableName = row.getString("base_table_name");
-        boolean includeAll = row.getBoolean("include_all_columns");
-        String whereClause = row.getString("where_clause");
+        /*
+         * We know the keyspace names we are going to query, but we still want 
to run the SELECT IN
+         * query, to filter out the keyspaces that had been dropped by the 
applied mutation set.
+         */
+        String query = format("SELECT keyspace_name FROM %s.%s WHERE 
keyspace_name IN ?", NAME, KEYSPACES);
 
-        List<ColumnDefinition> columns =
-            readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, 
SchemaKeyspace::createColumnsFromColumnsPartition);
+        Keyspaces.Builder keyspaces = 
org.apache.cassandra.schema.Keyspaces.builder();
+        for (UntypedResultSet.Row row : query(query, new 
ArrayList<>(includedKeyspaceNames)))
+            keyspaces.add(fetchKeyspace(row.getString("keyspace_name")));
+        return keyspaces.build();
+    }
 
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns =
-            readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, 
view, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition);
+    private static KeyspaceMetadata fetchKeyspace(String keyspaceName)
+    {
+        KeyspaceParams params = fetchKeyspaceParams(keyspaceName);
+        Types types = fetchTypes(keyspaceName);
+        Tables tables = fetchTables(keyspaceName, types);
+        Views views = fetchViews(keyspaceName, types);
+        Functions functions = fetchFunctions(keyspaceName, types);
+        return KeyspaceMetadata.create(keyspaceName, params, tables, views, 
types, functions);
+    }
 
-        CFMetaData cfm = CFMetaData.create(keyspace,
-                                           view,
-                                           id,
-                                           false,
-                                           true,
-                                           false,
-                                           false,
-                                           true,
-                                           columns,
-                                           DatabaseDescriptor.getPartitioner())
-                                   .params(createTableParamsFromRow(row))
-                                   .droppedColumns(droppedColumns);
+    private static KeyspaceParams fetchKeyspaceParams(String keyspaceName)
+    {
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", 
NAME, KEYSPACES);
+
+        UntypedResultSet.Row row = query(query, keyspaceName).one();
+        boolean durableWrites = 
row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString());
+        Map<String, String> replication = 
row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString());
+        return KeyspaceParams.create(durableWrites, replication);
+    }
 
-        String rawSelect = View.buildSelectStatement(baseTableName, columns, 
whereClause);
-        SelectStatement.RawStatement rawStatement = 
(SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+    private static Types fetchTypes(String keyspaceName)
+    {
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", 
NAME, TYPES);
 
-        return new ViewDefinition(keyspace, view, baseTableId, baseTableName, 
includeAll, rawStatement, whereClause, cfm);
+        Types.RawBuilder types = 
org.apache.cassandra.schema.Types.rawBuilder(keyspaceName);
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+        {
+            String name = row.getString("type_name");
+            List<String> fieldNames = row.getFrozenList("field_names", 
UTF8Type.instance);
+            List<String> fieldTypes = row.getFrozenList("field_types", 
UTF8Type.instance);
+            types.add(name, fieldNames, fieldTypes);
+        }
+        return types.build();
     }
 
-    /*
-     * Secondary Index metadata serialization/deserialization.
-     */
+    private static Tables fetchTables(String keyspaceName, Types types)
+    {
+        String query = format("SELECT table_name FROM %s.%s WHERE 
keyspace_name = ?", NAME, TABLES);
 
-    private static void addIndexToSchemaMutation(CFMetaData table,
-                                                 IndexMetadata index,
-                                                 long timestamp,
-                                                 Mutation mutation)
+        Tables.Builder tables = org.apache.cassandra.schema.Tables.builder();
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+            tables.add(fetchTable(keyspaceName, row.getString("table_name"), 
types));
+        return tables.build();
+    }
+
+    private static CFMetaData fetchTable(String keyspaceName, String 
tableName, Types types)
     {
-        RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, 
mutation).clustering(table.cfName, index.name);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name 
= ? AND table_name = ?", NAME, TABLES);
+        UntypedResultSet rows = query(query, keyspaceName, tableName);
+        if (rows.isEmpty())
+            throw new RuntimeException(String.format("%s:%s not found in the 
schema definitions keyspace.", keyspaceName, tableName));
+        UntypedResultSet.Row row = rows.one();
 
-        builder.add("kind", index.kind.toString());
-        builder.frozenMap("options", index.options);
-        builder.build();
+        UUID id = row.getUUID("id");
+
+        Set<CFMetaData.Flag> flags = 
CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance));
+
+        boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
+        boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
+        boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
+        boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+
+        List<ColumnDefinition> columns = fetchColumns(keyspaceName, tableName, 
types);
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = 
fetchDroppedColumns(keyspaceName, tableName);
+        Indexes indexes = fetchIndexes(keyspaceName, tableName);
+        Triggers triggers = fetchTriggers(keyspaceName, tableName);
+
+        return CFMetaData.create(keyspaceName,
+                                 tableName,
+                                 id,
+                                 isDense,
+                                 isCompound,
+                                 isSuper,
+                                 isCounter,
+                                 false,
+                                 columns,
+                                 DatabaseDescriptor.getPartitioner())
+                         .params(createTableParamsFromRow(row))
+                         .droppedColumns(droppedColumns)
+                         .indexes(indexes)
+                         .triggers(triggers);
+    }
+
+    public static TableParams createTableParamsFromRow(UntypedResultSet.Row 
row)
+    {
+        return TableParams.builder()
+                          
.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
+                          
.caching(CachingParams.fromMap(row.getFrozenTextMap("caching")))
+                          .comment(row.getString("comment"))
+                          
.compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction")))
+                          
.compression(CompressionParams.fromMap(row.getFrozenTextMap("compression")))
+                          
.dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
+                          
.defaultTimeToLive(row.getInt("default_time_to_live"))
+                          .extensions(row.getFrozenMap("extensions", 
UTF8Type.instance, BytesType.instance))
+                          .gcGraceSeconds(row.getInt("gc_grace_seconds"))
+                          .maxIndexInterval(row.getInt("max_index_interval"))
+                          
.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
+                          .minIndexInterval(row.getInt("min_index_interval"))
+                          
.readRepairChance(row.getDouble("read_repair_chance"))
+                          .crcCheckChance(row.getDouble("crc_check_chance"))
+                          
.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")))
+                          .build();
+    }
+
+    private static List<ColumnDefinition> fetchColumns(String keyspace, String 
table, Types types)
+    {
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND 
table_name = ?", NAME, COLUMNS);
+        List<ColumnDefinition> columns = new ArrayList<>();
+        query(query, keyspace, table).forEach(row -> 
columns.add(createColumnFromRow(row, types)));
+        return columns;
     }
 
-    private static void dropIndexFromSchemaMutation(CFMetaData table,
-                                                    IndexMetadata index,
-                                                    long timestamp,
-                                                    Mutation mutation)
+    public static ColumnDefinition createColumnFromRow(UntypedResultSet.Row 
row, Types types)
     {
-        RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, 
index.name);
+        String keyspace = row.getString("keyspace_name");
+        String table = row.getString("table_name");
+
+        ColumnIdentifier name = 
ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), 
row.getString("column_name"));
+
+        ColumnDefinition.Kind kind = 
ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase());
+
+        int position = row.getInt("position");
+        ClusteringOrder order = 
ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
+
+        AbstractType<?> type = parse(keyspace, row.getString("type"), types);
+        if (order == ClusteringOrder.DESC)
+            type = ReversedType.getInstance(type);
+
+        return new ColumnDefinition(keyspace, table, name, type, position, 
kind);
     }
 
-    private static void addUpdatedIndexToSchemaMutation(CFMetaData table,
-                                                        IndexMetadata index,
-                                                        long timestamp,
-                                                        Mutation mutation)
+    private static Map<ByteBuffer, CFMetaData.DroppedColumn> 
fetchDroppedColumns(String keyspace, String table)
     {
-        addIndexToSchemaMutation(table, index, timestamp, mutation);
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND 
table_name = ?", NAME, DROPPED_COLUMNS);
+        Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
+        for (UntypedResultSet.Row row : query(query, keyspace, table))
+        {
+            CFMetaData.DroppedColumn column = createDroppedColumnFromRow(row);
+            columns.put(UTF8Type.instance.decompose(column.name), column);
+        }
+        return columns;
     }
-    /**
-     * Deserialize secondary indexes from storage-level representation.
-     *
-     * @param partition storage-level partition containing the index 
definitions
-     * @return the list of processed IndexMetadata
-     */
-    private static Indexes createIndexesFromIndexesPartition(CFMetaData cfm, 
RowIterator partition)
+
+    private static CFMetaData.DroppedColumn 
createDroppedColumnFromRow(UntypedResultSet.Row row)
+    {
+        String keyspace = row.getString("keyspace_name");
+        String name = row.getString("column_name");
+        /*
+         * we never store actual UDT names in dropped column types (so that we 
can safely drop types if nothing refers to
+         * them anymore), so before storing dropped columns in schema we 
expand UDTs to tuples. See expandUserTypes method.
+         * Because of that, we can safely pass Types.none() to parse()
+         */
+        AbstractType<?> type = parse(keyspace, row.getString("type"), 
org.apache.cassandra.schema.Types.none());
+        long droppedTime = 
TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
+        return new CFMetaData.DroppedColumn(name, type, droppedTime);
+    }
+
+    private static Indexes fetchIndexes(String keyspace, String table)
     {
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name 
= ? AND table_name = ?", NAME, INDEXES);
         Indexes.Builder indexes = 
org.apache.cassandra.schema.Indexes.builder();
-        String query = String.format("SELECT * FROM %s.%s", NAME, INDEXES);
-        QueryProcessor.resultify(query, partition).forEach(row -> 
indexes.add(createIndexMetadataFromIndexesRow(row)));
+        query(query, keyspace, table).forEach(row -> 
indexes.add(createIndexMetadataFromRow(row)));
         return indexes.build();
     }
 
-    private static IndexMetadata 
createIndexMetadataFromIndexesRow(UntypedResultSet.Row row)
+    private static IndexMetadata 
createIndexMetadataFromRow(UntypedResultSet.Row row)
     {
         String name = row.getString("index_name");
         IndexMetadata.Kind type = 
IndexMetadata.Kind.valueOf(row.getString("kind"));
@@ -1513,66 +1058,104 @@ public final class SchemaKeyspace
         return IndexMetadata.fromSchemaMetadata(name, type, options);
     }
 
-    /*
-     * UDF metadata serialization/deserialization.
-     */
+    private static Triggers fetchTriggers(String keyspace, String table)
+    {
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name 
= ? AND table_name = ?", NAME, TRIGGERS);
+        Triggers.Builder triggers = 
org.apache.cassandra.schema.Triggers.builder();
+        query(query, keyspace, table).forEach(row -> 
triggers.add(createTriggerFromRow(row)));
+        return triggers.build();
+    }
 
-    public static Mutation makeCreateFunctionMutation(KeyspaceMetadata 
keyspace, UDFunction function, long timestamp)
+    private static TriggerMetadata createTriggerFromRow(UntypedResultSet.Row 
row)
     {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
-        addFunctionToSchemaMutation(function, timestamp, mutation);
-        return mutation;
+        String name = row.getString("trigger_name");
+        String classOption = row.getFrozenTextMap("options").get("class");
+        return new TriggerMetadata(name, classOption);
     }
 
-    static void addFunctionToSchemaMutation(UDFunction function, long 
timestamp, Mutation mutation)
+    private static Views fetchViews(String keyspaceName, Types types)
     {
-        RowUpdateBuilder adder = new RowUpdateBuilder(Functions, timestamp, 
mutation)
-                                 .clustering(function.name().name, 
functionSignatureWithTypes(function));
+        String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name 
= ?", NAME, VIEWS);
 
-        adder.add("body", function.body())
-             .add("language", function.language())
-             .add("return_type", function.returnType().toString())
-             .add("called_on_null_input", function.isCalledOnNullInput())
-             .frozenList("argument_names", 
function.argNames().stream().map((c) -> 
bbToString(c.bytes)).collect(Collectors.toList()))
-             .frozenList("argument_types", 
function.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList()));
+        Views.Builder views = org.apache.cassandra.schema.Views.builder();
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+            views.add(fetchView(keyspaceName, row.getString("view_name"), 
types));
+        return views.build();
+    }
 
-        adder.build();
+    private static ViewDefinition fetchView(String keyspaceName, String 
viewName, Types types)
+    {
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name 
= ? AND view_name = ?", NAME, VIEWS);
+        UntypedResultSet rows = query(query, keyspaceName, viewName);
+        if (rows.isEmpty())
+            throw new RuntimeException(String.format("%s:%s not found in the 
schema definitions keyspace.", keyspaceName, viewName));
+        UntypedResultSet.Row row = rows.one();
+
+        UUID id = row.getUUID("id");
+        UUID baseTableId = row.getUUID("base_table_id");
+        String baseTableName = row.getString("base_table_name");
+        boolean includeAll = row.getBoolean("include_all_columns");
+        String whereClause = row.getString("where_clause");
+
+        List<ColumnDefinition> columns = fetchColumns(keyspaceName, viewName, 
types);
+
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = 
fetchDroppedColumns(keyspaceName, viewName);
+
+        CFMetaData cfm = CFMetaData.create(keyspaceName,
+                                           viewName,
+                                           id,
+                                           false,
+                                           true,
+                                           false,
+                                           false,
+                                           true,
+                                           columns,
+                                           DatabaseDescriptor.getPartitioner())
+                                   .params(createTableParamsFromRow(row))
+                                   .droppedColumns(droppedColumns);
+
+            String rawSelect = View.buildSelectStatement(baseTableName, 
columns, whereClause);
+            SelectStatement.RawStatement rawStatement = 
(SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+
+            return new ViewDefinition(keyspaceName, viewName, baseTableId, 
baseTableName, includeAll, rawStatement, whereClause, cfm);
     }
 
-    public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, 
UDFunction function, long timestamp)
+    private static Functions fetchFunctions(String keyspaceName, Types types)
     {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
-        return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, 
function.name().name, functionSignatureWithTypes(function));
+        Functions udfs = fetchUDFs(keyspaceName, types);
+        Functions udas = fetchUDAs(keyspaceName, udfs, types);
+
+        return org.apache.cassandra.schema.Functions.builder()
+                                                    .add(udfs)
+                                                    .add(udas)
+                                                    .build();
     }
 
-    private static Collection<UDFunction> 
createFunctionsFromFunctionsPartition(RowIterator partition)
+    private static Functions fetchUDFs(String keyspaceName, Types types)
     {
-        List<UDFunction> functions = new ArrayList<>();
-        String query = String.format("SELECT * FROM %s.%s", NAME, FUNCTIONS);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, 
partition))
-            functions.add(createFunctionFromFunctionRow(row));
-        return functions;
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", 
NAME, FUNCTIONS);
+
+        Functions.Builder functions = 
org.apache.cassandra.schema.Functions.builder();
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+            functions.add(createUDFFromRow(row, types));
+        return functions.build();
     }
 
-    private static UDFunction 
createFunctionFromFunctionRow(UntypedResultSet.Row row)
+    private static UDFunction createUDFFromRow(UntypedResultSet.Row row, Types 
types)
     {
         String ksName = row.getString("keyspace_name");
         String functionName = row.getString("function_name");
         FunctionName name = new FunctionName(ksName, functionName);
 
         List<ColumnIdentifier> argNames = new ArrayList<>();
-        if (row.has("argument_names"))
-            for (String arg : row.getFrozenList("argument_names", 
UTF8Type.instance))
-                argNames.add(new ColumnIdentifier(arg, true));
+        for (String arg : row.getFrozenList("argument_names", 
UTF8Type.instance))
+            argNames.add(new ColumnIdentifier(arg, true));
 
         List<AbstractType<?>> argTypes = new ArrayList<>();
-        if (row.has("argument_types"))
-            for (String type : row.getFrozenList("argument_types", 
UTF8Type.instance))
-                argTypes.add(parseType(type));
+        for (String type : row.getFrozenList("argument_types", 
UTF8Type.instance))
+            argTypes.add(parse(ksName, type, types));
 
-        AbstractType<?> returnType = parseType(row.getString("return_type"));
+        AbstractType<?> returnType = parse(ksName, 
row.getString("return_type"), types);
 
         String language = row.getString("language");
         String body = row.getString("body");
@@ -1609,70 +1192,33 @@ public final class SchemaKeyspace
         }
     }
 
-    /*
-     * Aggregate UDF metadata serialization/deserialization.
-     */
-
-    public static Mutation makeCreateAggregateMutation(KeyspaceMetadata 
keyspace, UDAggregate aggregate, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
-        addAggregateToSchemaMutation(aggregate, timestamp, mutation);
-        return mutation;
-    }
-
-    static void addAggregateToSchemaMutation(UDAggregate aggregate, long 
timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder adder = new RowUpdateBuilder(Aggregates, timestamp, 
mutation)
-                                 .clustering(aggregate.name().name, 
functionSignatureWithTypes(aggregate));
-
-        adder.add("return_type", aggregate.returnType().toString())
-             .add("state_func", aggregate.stateFunction().name().name)
-             .add("state_type", aggregate.stateType() != null ? 
aggregate.stateType().toString() : null)
-             .add("final_func", aggregate.finalFunction() != null ? 
aggregate.finalFunction().name().name : null)
-             .add("initcond", aggregate.initialCondition())
-             .frozenList("argument_types", 
aggregate.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList()))
-             .build();
-    }
-
-    private static Functions createAggregatesFromAggregatesPartition(Functions 
functions, RowIterator partition)
+    private static Functions fetchUDAs(String keyspaceName, Functions udfs, 
Types types)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, AGGREGATES);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, 
partition))
-            functions = 
functions.with(createAggregateFromAggregateRow(functions, row));
-        return functions;
-    }
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", 
NAME, AGGREGATES);
 
-    private static UDAggregate 
createAggregateFromAggregateRow(UntypedResultSet.Row row)
-    {
-        return 
createAggregateFromAggregateRow(Schema.instance.getKSMetaData(row.getString("keyspace_name")).functions,
 row);
+        Functions.Builder aggregates = 
org.apache.cassandra.schema.Functions.builder();
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+            aggregates.add(createUDAFromRow(row, udfs, types));
+        return aggregates.build();
     }
 
-    private static UDAggregate createAggregateFromAggregateRow(Functions 
functions, UntypedResultSet.Row row)
+    private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, 
Functions functions, Types types)
     {
         String ksName = row.getString("keyspace_name");
         String functionName = row.getString("aggregate_name");
         FunctionName name = new FunctionName(ksName, functionName);
 
-        List<String> types = row.getFrozenList("argument_types", 
UTF8Type.instance);
+        List<AbstractType<?>> argTypes =
+            row.getFrozenList("argument_types", UTF8Type.instance)
+               .stream()
+               .map(t -> parse(ksName, t, types))
+               .collect(toList());
 
-        List<AbstractType<?>> argTypes;
-        if (types == null)
-        {
-            argTypes = Collections.emptyList();
-        }
-        else
-        {
-            argTypes = new ArrayList<>(types.size());
-            for (String type : types)
-                argTypes.add(parseType(type));
-        }
-
-        AbstractType<?> returnType = parseType(row.getString("return_type"));
+        AbstractType<?> returnType = parse(ksName, 
row.getString("return_type"), types);
 
         FunctionName stateFunc = new FunctionName(ksName, 
(row.getString("state_func")));
         FunctionName finalFunc = row.has("final_func") ? new 
FunctionName(ksName, row.getString("final_func")) : null;
-        AbstractType<?> stateType = row.has("state_type") ? 
parseType(row.getString("state_type")) : null;
+        AbstractType<?> stateType = row.has("state_type") ? parse(ksName, 
row.getString("state_type"), types) : null;
         ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : 
null;
 
         try
@@ -1685,30 +1231,171 @@ public final class SchemaKeyspace
         }
     }
 
-    public static Mutation makeDropAggregateMutation(KeyspaceMetadata 
keyspace, UDAggregate aggregate, long timestamp)
+    private static UntypedResultSet query(String query, Object... variables)
     {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, 
keyspace.params, timestamp);
-        return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, 
aggregate.name().name, functionSignatureWithTypes(aggregate));
+        return executeInternal(query, variables);
+    }
+
+    /*
+     * Merging schema
+     */
+
+    /**
+     * Merge remote schema in form of mutations with local and mutate ks/cf 
metadata objects
+     * (which also involves fs operations on add/drop ks/cf)
+     *
+     * @param mutations the schema changes to apply
+     *
+     * @throws ConfigurationException If one of metadata attributes has 
invalid value
+     */
+    public static synchronized void 
mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws 
ConfigurationException
+    {
+        mergeSchema(mutations);
+        Schema.instance.updateVersionAndAnnounce();
+    }
+
+    public static synchronized void mergeSchema(Collection<Mutation> mutations)
+    {
+        // only compare the keyspaces affected by this set of schema mutations
+        Set<String> affectedKeyspaces =
+        mutations.stream()
+                 .map(m -> UTF8Type.instance.compose(m.key().getKey()))
+                 .collect(Collectors.toSet());
+
+        // fetch the current state of schema for the affected keyspaces only
+        Keyspaces before = Schema.instance.getKeyspaces(affectedKeyspaces);
+
+        // apply the schema mutations and flush
+        mutations.forEach(Mutation::apply);
+        if (FLUSH_SCHEMA_TABLES)
+            flush();
+
+        // fetch the new state of schema from schema tables (not applied to 
Schema.instance yet)
+        Keyspaces after = fetchKeyspacesOnly(affectedKeyspaces);
+
+        // deal with the diff
+        MapDifference<String, KeyspaceMetadata> keyspacesDiff = 
before.diff(after);
+
+        // dropped keyspaces
+        for (KeyspaceMetadata keyspace : 
keyspacesDiff.entriesOnlyOnLeft().values())
+        {
+            keyspace.functions.udas().forEach(Schema.instance::dropAggregate);
+            keyspace.functions.udfs().forEach(Schema.instance::dropFunction);
+            keyspace.views.forEach(v -> Schema.instance.dropView(v.ksName, 
v.viewName));
+            keyspace.tables.forEach(t -> Schema.instance.dropTable(t.ksName, 
t.cfName));
+            keyspace.types.forEach(Schema.instance::dropType);
+            Schema.instance.dropKeyspace(keyspace.name);
+        }
+
+        // new keyspaces
+        for (KeyspaceMetadata keyspace : 
keyspacesDiff.entriesOnlyOnRight().values())
+        {
+            Schema.instance.addKeyspace(KeyspaceMetadata.create(keyspace.name, 
keyspace.params));
+            keyspace.types.forEach(Schema.instance::addType);
+            keyspace.tables.forEach(Schema.instance::addTable);
+            keyspace.views.forEach(Schema.instance::addView);
+            keyspace.functions.udfs().forEach(Schema.instance::addFunction);
+            keyspace.functions.udas().forEach(Schema.instance::addAggregate);
+        }
+
+        // updated keyspaces
+        for (Map.Entry<String, 
MapDifference.ValueDifference<KeyspaceMetadata>> diff : 
keyspacesDiff.entriesDiffering().entrySet())
+            updateKeyspace(diff.getKey(), diff.getValue().leftValue(), 
diff.getValue().rightValue());
+    }
+
+    private static void updateKeyspace(String keyspaceName, KeyspaceMetadata 
keyspaceBefore, KeyspaceMetadata keyspaceAfter)
+    {
+        // calculate the deltas
+        MapDifference<String, CFMetaData> tablesDiff = 
keyspaceBefore.tables.diff(keyspaceAfter.tables);
+        MapDifference<String, ViewDefinition> viewsDiff = 
keyspaceBefore.views.diff(keyspaceAfter.views);
+        MapDifference<ByteBuffer, UserType> typesDiff = 
keyspaceBefore.types.diff(keyspaceAfter.types);
+
+        Map<Pair<FunctionName, List<String>>, UDFunction> udfsBefore = new 
HashMap<>();
+        keyspaceBefore.functions.udfs().forEach(f -> 
udfsBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+        Map<Pair<FunctionName, List<String>>, UDFunction> udfsAfter = new 
HashMap<>();
+        keyspaceAfter.functions.udfs().forEach(f -> 
udfsAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+        MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = 
Maps.difference(udfsBefore, udfsAfter);
+
+        Map<Pair<FunctionName, List<String>>, UDAggregate> udasBefore = new 
HashMap<>();
+        keyspaceBefore.functions.udas().forEach(f -> 
udasBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+        Map<Pair<FunctionName, List<String>>, UDAggregate> udasAfter = new 
HashMap<>();
+        keyspaceAfter.functions.udas().forEach(f -> 
udasAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+        MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff 
= Maps.difference(udasBefore, udasAfter);
+
+        // update keyspace params, if changed
+        if (!keyspaceBefore.params.equals(keyspaceAfter.params))
+            Schema.instance.updateKeyspace(keyspaceName, keyspaceAfter.params);
+
+        // drop everything removed
+        
udasDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropAggregate);
+        
udfsDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropFunction);
+        viewsDiff.entriesOnlyOnLeft().values().forEach(v -> 
Schema.instance.dropView(v.ksName, v.viewName));
+        tablesDiff.entriesOnlyOnLeft().values().forEach(t -> 
Schema.instance.dropTable(t.ksName, t.cfName));
+        
typesDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropType);
+
+        // add everything created
+        
typesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addType);
+        
tablesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addTable);
+        
viewsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addView);
+        
udfsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addFunction);
+        
udasDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addAggregate);
+
+        // update everything altered
+        for (MapDifference.ValueDifference<UserType> diff : 
typesDiff.entriesDiffering().values())
+            Schema.instance.updateType(diff.rightValue());
+        for (MapDifference.ValueDifference<CFMetaData> diff : 
tablesDiff.entriesDiffering().values())
+            Schema.instance.updateTable(diff.rightValue());
+        for (MapDifference.ValueDifference<ViewDefinition> diff : 
viewsDiff.entriesDiffering().values())
+            Schema.instance.updateView(diff.rightValue());
+        for (MapDifference.ValueDifference<UDFunction> diff : 
udfsDiff.entriesDiffering().values())
+            Schema.instance.updateFunction(diff.rightValue());
+        for (MapDifference.ValueDifference<UDAggregate> diff : 
udasDiff.entriesDiffering().values())
+            Schema.instance.updateAggregate(diff.rightValue());
     }
 
-    private static AbstractType<?> parseType(String str)
+    /*
+     * Type parsing and transformation
+     */
+
+    /*
+     * Recursively replaces any instances of UserType with an equivalent 
TupleType.
+     * We do it for dropped_columns, to allow safely dropping unused user 
types without retaining any references
+     * in dropped_columns.
+     */
+    private static AbstractType<?> expandUserTypes(AbstractType<?> original)
     {
-        return TypeParser.parse(str);
+        if (original instanceof UserType)
+            return new TupleType(expandUserTypes(((UserType) 
original).fieldTypes()));
+
+        if (original instanceof TupleType)
+            return new TupleType(expandUserTypes(((TupleType) 
original).allTypes()));
+
+        if (original instanceof ListType<?>)
+            return ListType.getInstance(expandUserTypes(((ListType<?>) 
original).getElementsType()), original.isMultiCell());
+
+        if (original instanceof MapType<?,?>)
+        {
+            MapType<?, ?> mt = (MapType<?, ?>) original;
+            return MapType.getInstance(expandUserTypes(mt.getKeysType()), 
expandUserTypes(mt.getValuesType()), mt.isMultiCell());
+        }
+
+        if (original instanceof SetType<?>)
+            return SetType.getInstance(expandUserTypes(((SetType<?>) 
original).getElementsType()), original.isMultiCell());
+
+        // this is very unlikely to ever happen, but it's better to be safe 
than sorry
+        if (original instanceof ReversedType<?>)
+            return ReversedType.getInstance(expandUserTypes(((ReversedType) 
original).baseType));
+
+        if (original instanceof CompositeType)
+            return 
CompositeType.getInstance(expandUserTypes(original.getComponents()));
+
+        return original;
     }
 
-    // We allow method overloads, so a function is not uniquely identified by 
its name only, but
-    // also by its argument types. To distinguish overloads of given function 
name in the schema
-    // we use a "signature" which is just a list of it's CQL argument types 
(we could replace that by
-    // using a "signature" UDT that would be comprised of the function name 
and argument types,
-    // which we could then use as clustering column. But as we haven't yet 
used UDT in system tables,
-    // We'll leave that decision to #6717).
-    public static ByteBuffer functionSignatureWithTypes(AbstractFunction fun)
+    private static List<AbstractType<?>> expandUserTypes(List<AbstractType<?>> 
types)
     {
-        ListType<String> list = ListType.getInstance(UTF8Type.instance, false);
-        List<String> strList = new ArrayList<>(fun.argTypes().size());
-        for (AbstractType<?> argType : fun.argTypes())
-            strList.add(argType.asCQL3Type().toString());
-        return list.decompose(strList);
+        return types.stream()
+                    .map(SchemaKeyspace::expandUserTypes)
+                    .collect(toList());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Tables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Tables.java 
b/src/java/org/apache/cassandra/schema/Tables.java
index 151697d..4f728d4 100644
--- a/src/java/org/apache/cassandra/schema/Tables.java
+++ b/src/java/org/apache/cassandra/schema/Tables.java
@@ -23,6 +23,8 @@ import java.util.Optional;
 import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
 
 import org.apache.cassandra.config.CFMetaData;
 
@@ -115,6 +117,11 @@ public final class Tables implements Iterable<CFMetaData>
         return builder().add(filter(this, t -> t != table)).build();
     }
 
+    MapDifference<String, CFMetaData> diff(Tables other)
+    {
+        return Maps.difference(tables, other.tables);
+    }
+
     @Override
     public boolean equals(Object o)
     {

Reply via email to