Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 f4af15463 -> 340df43fb
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Types.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Types.java b/src/java/org/apache/cassandra/schema/Types.java index 0a7bb4f..0d6e36d 100644 --- a/src/java/org/apache/cassandra/schema/Types.java +++ b/src/java/org/apache/cassandra/schema/Types.java @@ -18,37 +18,61 @@ package org.apache.cassandra.schema; import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.Optional; +import java.util.*; import javax.annotation.Nullable; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.jgrapht.graph.DefaultDirectedGraph; +import org.jgrapht.graph.DefaultEdge; +import org.jgrapht.traverse.TopologicalOrderIterator; import static com.google.common.collect.Iterables.filter; +import static java.util.stream.Collectors.toList; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; /** * An immutable container for a keyspace's UDTs. */ public final class Types implements Iterable<UserType> { - private final ImmutableMap<ByteBuffer, UserType> types; + private static final Types NONE = new Types(ImmutableMap.of()); + + private final Map<ByteBuffer, UserType> types; private Types(Builder builder) { types = builder.types.build(); } + /* + * For use in RawBuilder::build only. + */ + private Types(Map<ByteBuffer, UserType> types) + { + this.types = types; + } + public static Builder builder() { return new Builder(); } + public static RawBuilder rawBuilder(String keyspace) + { + return new RawBuilder(keyspace); + } + public static Types none() { - return builder().build(); + return NONE; } public static Types of(UserType... types) @@ -106,6 +130,11 @@ public final class Types implements Iterable<UserType> return builder().add(filter(this, t -> t != type)).build(); } + MapDifference<ByteBuffer, UserType> diff(Types other) + { + return Maps.difference(types, other.types); + } + @Override public boolean equals(Object o) { @@ -126,7 +155,7 @@ public final class Types implements Iterable<UserType> public static final class Builder { - final ImmutableMap.Builder<ByteBuffer, UserType> types = new ImmutableMap.Builder<>(); + final ImmutableMap.Builder<ByteBuffer, UserType> types = ImmutableMap.builder(); private Builder() { @@ -156,4 +185,100 @@ public final class Types implements Iterable<UserType> return this; } } + + public static final class RawBuilder + { + final String keyspace; + final List<RawUDT> definitions; + + private RawBuilder(String keyspace) + { + this.keyspace = keyspace; + this.definitions = new ArrayList<>(); + } + + /** + * Build a Types instance from Raw definitions. + * + * Constructs a DAG of graph dependencies and resolves them 1 by 1 in topological order. + */ + public Types build() + { + if (definitions.isEmpty()) + return Types.none(); + + /* + * build a DAG of UDT dependencies + */ + DefaultDirectedGraph<RawUDT, DefaultEdge> graph = new DefaultDirectedGraph<>(DefaultEdge.class); + + definitions.forEach(graph::addVertex); + + for (RawUDT udt1: definitions) + for (RawUDT udt2 : definitions) + if (udt1 != udt2 && udt1.referencesUserType(udt2.name)) + graph.addEdge(udt2, udt1); + + /* + * iterate in topological order, + */ + Types types = new Types(new HashMap<>()); + + TopologicalOrderIterator<RawUDT, DefaultEdge> iterator = new TopologicalOrderIterator<>(graph); + while (iterator.hasNext()) + { + UserType udt = iterator.next().prepare(keyspace, types); // will throw InvalidRequestException if meets an unknown type + types.types.put(udt.name, udt); + } + + /* + * return an immutable copy + */ + return Types.builder().add(types).build(); + } + + void add(String name, List<String> fieldNames, List<String> fieldTypes) + { + List<CQL3Type.Raw> rawFieldTypes = + fieldTypes.stream() + .map(CQLTypeParser::parseRaw) + .collect(toList()); + + definitions.add(new RawUDT(name, fieldNames, rawFieldTypes)); + } + + private static final class RawUDT + { + final String name; + final List<String> fieldNames; + final List<CQL3Type.Raw> fieldTypes; + + RawUDT(String name, List<String> fieldNames, List<CQL3Type.Raw> fieldTypes) + { + this.name = name; + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + } + + boolean referencesUserType(String typeName) + { + return fieldTypes.stream().anyMatch(t -> t.referencesUserType(typeName)); + } + + UserType prepare(String keyspace, Types types) + { + List<ByteBuffer> preparedFieldNames = + fieldNames.stream() + .map(ByteBufferUtil::bytes) + .collect(toList()); + + List<AbstractType<?>> preparedFieldTypes = + fieldTypes.stream() + .map(t -> t.prepare(keyspace, types).getType()) + .collect(toList()); + + return new UserType(keyspace, bytes(name), preparedFieldNames, preparedFieldTypes); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Views.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Views.java b/src/java/org/apache/cassandra/schema/Views.java index 5888b9d..b8fdd4b 100644 --- a/src/java/org/apache/cassandra/schema/Views.java +++ b/src/java/org/apache/cassandra/schema/Views.java @@ -26,6 +26,8 @@ import javax.annotation.Nullable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ViewDefinition; @@ -124,6 +126,11 @@ public final class Views implements Iterable<ViewDefinition> return without(view.viewName).with(view); } + MapDifference<String, ViewDefinition> diff(Views other) + { + return Maps.difference(views, other.views); + } + @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index de6c7f7..6a21f91 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -468,20 +468,9 @@ public class MigrationManager private static void announce(Mutation schema, boolean announceLocally) { if (announceLocally) - { - try - { - SchemaKeyspace.mergeSchema(Collections.singletonList(schema)); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } + SchemaKeyspace.mergeSchema(Collections.singletonList(schema)); else - { FBUtilities.waitOnFuture(announce(Collections.singletonList(schema))); - } } private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema) @@ -497,7 +486,7 @@ public class MigrationManager { Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable() { - protected void runMayThrow() throws IOException, ConfigurationException + protected void runMayThrow() throws ConfigurationException { SchemaKeyspace.mergeSchemaAndAnnounceVersion(schema); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/service/MigrationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java index 4e3fac3..8a1b858 100644 --- a/src/java/org/apache/cassandra/service/MigrationTask.java +++ b/src/java/org/apache/cassandra/service/MigrationTask.java @@ -74,10 +74,6 @@ class MigrationTask extends WrappedRunnable { SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload); } - catch (IOException e) - { - logger.error("IOException merging remote schema", e); - } catch (ConfigurationException e) { logger.error("Configuration exception merging remote schema", e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/config/CFMetaDataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java index 3deb5fa..9d91df3 100644 --- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java +++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java @@ -21,18 +21,23 @@ package org.apache.cassandra.config; import java.util.*; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.db.*; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.SchemaKeyspace; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.schema.TableParams; +import org.apache.cassandra.schema.Types; import org.apache.cassandra.thrift.CfDef; import org.apache.cassandra.thrift.ColumnDef; import org.apache.cassandra.thrift.IndexType; @@ -154,10 +159,18 @@ public class CFMetaDataTest PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.TABLES)); PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS)); - CFMetaData newCfm = SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition( - UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds()), - UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds()) - ); - assert cfm.equals(newCfm) : String.format("%n%s%n!=%n%s", cfm, newCfm); + UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.TABLES), + UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds())) + .one(); + TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow); + + UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS), + UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds())); + Set<ColumnDefinition> columns = new HashSet<>(); + for (UntypedResultSet.Row row : columnsRows) + columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none())); + + assertEquals(cfm.params, params); + assertEquals(new HashSet<>(cfm.allColumns()), columns); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java index 0e7084f..9e3c51d 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java @@ -110,7 +110,7 @@ public class TupleTypeTest extends CQLTester row(0, 4, tuple(null, "1")) ); - assertInvalidMessage("Invalid tuple literal: too many elements. Type tuple<int, text> expects 2 but got 3", + assertInvalidMessage("Invalid tuple literal: too many elements. Type frozen<tuple<int, text>> expects 2 but got 3", "INSERT INTO %s(k, t) VALUES (1,'1:2:3')"); } @@ -121,7 +121,7 @@ public class TupleTypeTest extends CQLTester assertInvalidSyntax("INSERT INTO %s (k, t) VALUES (0, ())"); - assertInvalidMessage("Invalid tuple literal for t: too many elements. Type tuple<int, text, double> expects 3 but got 4", + assertInvalidMessage("Invalid tuple literal for t: too many elements. Type frozen<tuple<int, text, double>> expects 3 but got 4", "INSERT INTO %s (k, t) VALUES (0, (2, 'foo', 3.1, 'bar'))"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java index bf3d33f..78e85dc 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java @@ -797,7 +797,7 @@ public class UFTest extends CQLTester @Test public void testFunctionNonExistingKeyspace() throws Throwable { - assertInvalidMessage("to non existing keyspace", + assertInvalidMessage("Keyspace this_ks_does_not_exist doesn't exist", "CREATE OR REPLACE FUNCTION this_ks_does_not_exist.jnft(val double) " + "RETURNS NULL ON NULL INPUT " + "RETURNS double " + @@ -810,7 +810,7 @@ public class UFTest extends CQLTester { dropPerTestKeyspace(); - assertInvalidMessage("to non existing keyspace", + assertInvalidMessage("Keyspace " + KEYSPACE_PER_TEST + " doesn't exist", "CREATE OR REPLACE FUNCTION " + KEYSPACE_PER_TEST + ".jnft(val double) " + "RETURNS NULL ON NULL INPUT " + "RETURNS double " + http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java index d069d56..bc382fb 100644 --- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java +++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java @@ -63,8 +63,7 @@ public class LegacySchemaMigratorTest { CQLTester.cleanupAndLeaveDirs(); - List<KeyspaceMetadata> expected = keyspaceToMigrate(); - expected.sort((k1, k2) -> k1.name.compareTo(k2.name)); + Keyspaces expected = keyspacesToMigrate(); // write the keyspaces into the legacy tables expected.forEach(LegacySchemaMigratorTest::legacySerializeKeyspace); @@ -73,8 +72,7 @@ public class LegacySchemaMigratorTest LegacySchemaMigrator.migrate(); // read back all the metadata from the new schema tables - List<KeyspaceMetadata> actual = SchemaKeyspace.readSchemaFromSystemTables(); - actual.sort((k1, k2) -> k1.name.compareTo(k2.name)); + Keyspaces actual = SchemaKeyspace.fetchNonSystemKeyspaces(); // need to load back CFMetaData of those tables (CFS instances will still be loaded) loadLegacySchemaTables(); @@ -104,9 +102,9 @@ public class LegacySchemaMigratorTest Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables)); } - private static List<KeyspaceMetadata> keyspaceToMigrate() + private static Keyspaces keyspacesToMigrate() { - List<KeyspaceMetadata> keyspaces = new ArrayList<>(); + Keyspaces.Builder keyspaces = Keyspaces.builder(); // A whole bucket of shorthand String ks1 = KEYSPACE_PREFIX + "Keyspace1"; @@ -255,7 +253,7 @@ public class LegacySchemaMigratorTest keyspaces.add(keyspaceWithUDAs()); keyspaces.add(keyspaceWithUDAsAndUDTs()); - return keyspaces; + return keyspaces.build(); } private static KeyspaceMetadata keyspaceWithDroppedCollections() http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java index 3eb4faf..a1b7ad3 100644 --- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import com.google.common.collect.ImmutableMap; @@ -32,11 +33,13 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -158,14 +161,14 @@ public class SchemaKeyspaceTest assertEquals(extensions, metadata.params.extensions); } - private static void updateTable(String keyspace, CFMetaData oldTable, CFMetaData newTable) throws IOException + private static void updateTable(String keyspace, CFMetaData oldTable, CFMetaData newTable) { KeyspaceMetadata ksm = Schema.instance.getKeyspaceInstance(keyspace).getMetadata(); Mutation mutation = SchemaKeyspace.makeUpdateTableMutation(ksm, oldTable, newTable, FBUtilities.timestampMicros(), false); SchemaKeyspace.mergeSchema(Collections.singleton(mutation)); } - private static void createTable(String keyspace, String cql) throws IOException + private static void createTable(String keyspace, String cql) { CFMetaData table = CFMetaData.compile(cql, keyspace); @@ -185,10 +188,21 @@ public class SchemaKeyspaceTest // Test schema conversion Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros()); - PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SystemKeyspace.NAME, SchemaKeyspace.TABLES)); - PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SystemKeyspace.NAME, SchemaKeyspace.COLUMNS)); - CFMetaData newCfm = SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition(UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds()), - UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds())); - assert cfm.equals(newCfm) : String.format("%n%s%n!=%n%s", cfm, newCfm); + PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.TABLES)); + PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS)); + + UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.TABLES), + UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds())) + .one(); + TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow); + + UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS), + UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds())); + Set<ColumnDefinition> columns = new HashSet<>(); + for (UntypedResultSet.Row row : columnsRows) + columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none())); + + assertEquals(cfm.params, params); + assertEquals(new HashSet<>(cfm.allColumns()), columns); } }