This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 001f70367e Harry model and in-JVM tests for partition-restricted 2i queries 001f70367e is described below commit 001f70367e32bd44dc03c30d5533e549bbaea67e Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Thu Jan 11 23:13:56 2024 -0600 Harry model and in-JVM tests for partition-restricted 2i queries patch by Caleb Rackliffe; reviewed by Alex Petrov for CASSANDRA-18275 Co-authored-by: Caleb Rackliffe <calebrackli...@gmail.com> Co-authored-by: Alex Petrov <oleksandr.pet...@gmail.com> --- CHANGES.txt | 1 + .../cassandra/fuzz/sai/MultiNodeSAITest.java | 102 +++++++ .../cassandra/fuzz/sai/SingleNodeSAITest.java | 310 +++++++++++++++++++++ .../cassandra/fuzz/sai/StaticsTortureTest.java | 264 ++++++++++++++++++ .../org/apache/cassandra/harry/ddl/SchemaSpec.java | 35 ++- .../cassandra/harry/model/AgainstSutChecker.java | 32 ++- .../apache/cassandra/harry/model/SelectHelper.java | 39 +-- .../harry/model/reconciler/PartitionState.java | 1 - .../harry/operations/CompiledStatement.java | 7 + .../cassandra/harry/operations/FilteringQuery.java | 6 + .../cassandra/harry/sut/injvm/InJvmSutBase.java | 11 +- 11 files changed, 770 insertions(+), 38 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index fbbc57216b..f3c3096705 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Harry model and in-JVM tests for partition-restricted 2i queries (CASSANDRA-18275) * Refactor cqlshmain global constants (CASSANDRA-19201) * Remove native_transport_port_ssl (CASSANDRA-19397) * Make nodetool reconfigurecms sync by default and add --cancel to be able to cancel ongoing reconfigurations (CASSANDRA-19216) diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java new file mode 100644 index 0000000000..b45b6f782b --- /dev/null +++ b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fuzz.sai; + +import org.junit.Before; +import org.junit.BeforeClass; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.test.sai.SAIUtil; +import org.apache.cassandra.harry.ddl.SchemaSpec; +import org.apache.cassandra.harry.sut.injvm.InJvmSut; +import org.apache.cassandra.harry.sut.injvm.InJvmSutBase; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class MultiNodeSAITest extends SingleNodeSAITest +{ + /** + * Chosing a fetch size has implications for how well this test will excercise paging, short-read protection, and + * other important parts of the distributed query apparatus. This should be set low enough to ensure a significant + * number of queries during validation page, but not too low that more expesive queries time out and fail the test. + */ + private static final int FETCH_SIZE = 10; + + @BeforeClass + public static void before() throws Throwable + { + cluster = Cluster.build() + .withNodes(2) + // At lower fetch sizes, queries w/ hundreds or thousands of matches can take a very long time. + .withConfig(InJvmSutBase.defaultConfig().andThen(c -> c.set("range_request_timeout", "180s").set("read_request_timeout", "180s") + .with(GOSSIP).with(NETWORK))) + .createWithoutStarting(); + cluster.setUncaughtExceptionsFilter(t -> { + logger.error("Caught exception, reporting during shutdown. Ignoring.", t); + return true; + }); + cluster.startup(); + cluster = init(cluster); + sut = new InJvmSut(cluster) { + @Override + public Object[][] execute(String cql, ConsistencyLevel cl, Object[] bindings) + { + // The goal here is to make replicas as out of date as possible, modulo the efforts of repair + // and read-repair in the test itself. + if (cql.contains("SELECT")) + return super.execute(cql, ConsistencyLevel.ALL, FETCH_SIZE, bindings); + return super.execute(cql, ConsistencyLevel.NODE_LOCAL, bindings); + } + }; + } + + @Before + public void beforeEach() + { + cluster.schemaChange("DROP KEYSPACE IF EXISTS harry"); + cluster.schemaChange("CREATE KEYSPACE harry WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"); + } + + @Override + protected void flush(SchemaSpec schema) + { + cluster.get(1).nodetool("flush", schema.keyspace, schema.table); + cluster.get(2).nodetool("flush", schema.keyspace, schema.table); + } + + @Override + protected void repair(SchemaSpec schema) + { + cluster.get(1).nodetool("repair", schema.keyspace); + } + + @Override + protected void compact(SchemaSpec schema) + { + cluster.get(1).nodetool("compact", schema.keyspace); + cluster.get(2).nodetool("compact", schema.keyspace); + } + + @Override + protected void waitForIndexesQueryable(SchemaSpec schema) + { + SAIUtil.waitForIndexQueryable(cluster, schema.keyspace); + } +} \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java new file mode 100644 index 0000000000..f0fdf6c819 --- /dev/null +++ b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fuzz.sai; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase; +import org.apache.cassandra.harry.ddl.ColumnSpec; +import org.apache.cassandra.harry.ddl.SchemaSpec; +import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder; +import org.apache.cassandra.harry.gen.DataGenerators; +import org.apache.cassandra.harry.gen.EntropySource; +import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource; +import org.apache.cassandra.harry.model.QuiescentChecker; +import org.apache.cassandra.harry.model.SelectHelper; +import org.apache.cassandra.harry.model.reconciler.PartitionState; +import org.apache.cassandra.harry.model.reconciler.Reconciler; +import org.apache.cassandra.harry.operations.FilteringQuery; +import org.apache.cassandra.harry.operations.Query; +import org.apache.cassandra.harry.operations.Relation; +import org.apache.cassandra.harry.sut.SystemUnderTest; +import org.apache.cassandra.harry.sut.TokenPlacementModel; +import org.apache.cassandra.harry.tracker.DataTracker; +import org.apache.cassandra.harry.tracker.DefaultDataTracker; + +public class SingleNodeSAITest extends IntegrationTestBase +{ + private static final int RUNS = 1; + + private static final int OPERATIONS_PER_RUN = 30_000; + private static final int REPAIR_SKIP = OPERATIONS_PER_RUN / 2; + private static final int FLUSH_SKIP = OPERATIONS_PER_RUN / 7; + private static final int VALIDATION_SKIP = OPERATIONS_PER_RUN / 100; + + private static final int NUM_PARTITIONS = OPERATIONS_PER_RUN / 1000; + protected static final int MAX_PARTITION_SIZE = 10_000; + private static final int UNIQUE_CELL_VALUES = 5; + + long seed = 1; + + @Test + public void basicSaiTest() + { + CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6); + SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl1", + Arrays.asList(ColumnSpec.ck("pk1", ColumnSpec.int64Type), + ColumnSpec.ck("pk2", ColumnSpec.asciiType(4, 100)), + ColumnSpec.ck("pk3", ColumnSpec.int64Type)), + Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.asciiType(4, 100)), + ColumnSpec.ck("ck2", ColumnSpec.asciiType, true), + ColumnSpec.ck("ck3", ColumnSpec.int64Type)), + Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)), + ColumnSpec.regularColumn("v2", ColumnSpec.int64Type), + ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)), + List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))), + false, + false, + "LeveledCompactionStrategy", + false); + + sut.schemaChange(schema.compile().cql()); + sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table + "_debug").compile().cql()); + sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai' ", + schema.regularColumns.get(0).name, + schema.keyspace, + schema.table, + schema.regularColumns.get(0).name)); + sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai';", + schema.regularColumns.get(1).name, + schema.keyspace, + schema.table, + schema.regularColumns.get(1).name)); + sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai';", + schema.regularColumns.get(2).name, + schema.keyspace, + schema.table, + schema.regularColumns.get(2).name)); + sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai';", + schema.staticColumns.get(0).name, + schema.keyspace, + schema.table, + schema.staticColumns.get(0).name)); + + waitForIndexesQueryable(schema); + + DataTracker tracker = new DefaultDataTracker(); + TokenPlacementModel.ReplicationFactor rf = new TokenPlacementModel.SimpleReplicationFactor(cluster.size()); + ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed, + MAX_PARTITION_SIZE, + MAX_PARTITION_SIZE, + tracker, + sut, + schema, + rf, + SystemUnderTest.ConsistencyLevel.QUORUM); + + for (int run = 0; run < RUNS; run++) + { + logger.info("Starting run {}/{}...", run + 1, RUNS); + EntropySource random = new JdkRandomEntropySource(run); + + // Populate the array of possible values for all operations in the run: + long[] values = new long[UNIQUE_CELL_VALUES]; + for (int i = 0; i < values.length; i++) + values[i] = random.next(); + + for (int i = 0; i < OPERATIONS_PER_RUN; i++) + { + int partitionIndex = random.nextInt(0, NUM_PARTITIONS); + + history.visitPartition(partitionIndex) + .insert(random.nextInt(MAX_PARTITION_SIZE), + new long[] { random.nextBoolean() ? DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)], + random.nextBoolean() ? DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)], + random.nextBoolean() ? DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] }, + new long[] { random.nextBoolean() ? DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] }); + + if (random.nextFloat() > 0.99f) + { + int row1 = random.nextInt(MAX_PARTITION_SIZE); + int row2 = random.nextInt(MAX_PARTITION_SIZE); + history.visitPartition(partitionIndex).deleteRowRange(Math.min(row1, row2), Math.max(row1, row2), + random.nextBoolean(), random.nextBoolean()); + } + else if (random.nextFloat() > 0.999f) + { + history.visitPartition(partitionIndex).deleteRowSlice(); + } + + if (random.nextFloat() > 0.995f) + { + history.visitPartition(partitionIndex).deleteColumns(); + } + + if (random.nextFloat() > 0.9995f) + { + history.visitPartition(partitionIndex).deletePartition(); + } + + if (i % REPAIR_SKIP == 0) + { + logger.debug("Repairing/flushing after operation {}...", i); + repair(schema); + } + else if (i % FLUSH_SKIP == 0) + { + logger.debug("Flushing after operation {}...", i); + flush(schema); + } + + if (i % VALIDATION_SKIP != 0) + continue; + + logger.debug("Validating partition at index {} after operation {} in run {}...", partitionIndex, i, run + 1); + + for (int j = 0; j < 10; j++) + { + List<Relation> relations = new ArrayList<>(); + + // For one text column and 2 numeric columns, we can use between 1 and 5 total relations. + int num = random.nextInt(1, 5); + + List<List<Relation.RelationKind>> pick = new ArrayList<>(); + //noinspection ArraysAsListWithZeroOrOneArgument + pick.add(new ArrayList<>(Arrays.asList(Relation.RelationKind.EQ))); // text column supports only EQ + pick.add(new ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT, Relation.RelationKind.LT))); + pick.add(new ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT, Relation.RelationKind.LT))); + + if (random.nextFloat() > 0.75f) + { + relations.addAll(Query.clusteringSliceQuery(schema, + partitionIndex, + random.next(), + random.next(), + random.nextBoolean(), + random.nextBoolean(), + false).relations); + } + + for (int k = 0; k < num; k++) + { + int column = random.nextInt(schema.regularColumns.size()); + Relation.RelationKind relationKind = pickKind(random, pick, column); + + if (relationKind != null) + relations.add(Relation.relation(relationKind, + schema.regularColumns.get(column), + values[random.nextInt(values.length)])); + } + + if (random.nextFloat() > 0.7f) + { + relations.add(Relation.relation(Relation.RelationKind.EQ, + schema.staticColumns.get(0), + values[random.nextInt(values.length)])); + } + + long pd = history.presetSelector.pdAtPosition(partitionIndex); + FilteringQuery query = new FilteringQuery(pd, false, relations, schema); + Reconciler reconciler = new Reconciler(history.presetSelector, schema, history::visitor); + Set<ColumnSpec<?>> columns = new HashSet<>(schema.allColumns); + + PartitionState modelState = reconciler.inflatePartitionState(pd, tracker, query).filter(query); + + if (modelState.rows().size() > 0) + logger.debug("Model contains {} matching rows for query {}.", modelState.rows().size(), query); + + try + { + QuiescentChecker.validate(schema, + tracker, + columns, + modelState, + SelectHelper.execute(sut, history.clock(), query), + query); + + // Run the query again to see if the first execution caused an issue via read-repair: + QuiescentChecker.validate(schema, + tracker, + columns, + modelState, + SelectHelper.execute(sut, history.clock(), query), + query); + } + catch (Throwable t) + { + logger.debug("Partition index = {}, run = {}, j = {}, i = {}", partitionIndex, run, j, i); + + Query partitionQuery = Query.selectPartition(schema, pd, false); + QuiescentChecker.validate(schema, + tracker, + columns, + reconciler.inflatePartitionState(pd, tracker, partitionQuery), + SelectHelper.execute(sut, history.clock(), partitionQuery), + partitionQuery); + logger.debug("Partition state agrees. Throwing original error..."); + + throw t; + } + } + } + + if (run + 1 < RUNS) + { + logger.debug("Forcing compaction at the end of run {}...", run + 1); + compact(schema); + } + } + } + + protected void flush(SchemaSpec schema) + { + cluster.get(1).nodetool("flush", schema.keyspace, schema.table); + } + + protected void compact(SchemaSpec schema) + { + cluster.get(1).nodetool("compact", schema.keyspace); + } + + protected void repair(SchemaSpec schema) + { + // Repair is nonsensical for a single node, but a repair does flush first, so do that at least. + cluster.get(1).nodetool("flush", schema.keyspace, schema.table); + } + + protected void waitForIndexesQueryable(SchemaSpec schema) {} + + private static Relation.RelationKind pickKind(EntropySource random, List<List<Relation.RelationKind>> options, int column) + { + Relation.RelationKind kind = null; + + if (!options.get(column).isEmpty()) + { + List<Relation.RelationKind> possible = options.get(column); + int chosen = random.nextInt(possible.size()); + kind = possible.remove(chosen); + + if (kind == Relation.RelationKind.EQ) + possible.clear(); // EQ precludes LT and GT + else + possible.remove(Relation.RelationKind.EQ); // LT GT preclude EQ + } + + return kind; + } +} \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java b/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java new file mode 100644 index 0000000000..6c24c2fff9 --- /dev/null +++ b/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fuzz.sai; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase; +import org.apache.cassandra.harry.data.ResultSetRow; +import org.apache.cassandra.harry.ddl.ColumnSpec; +import org.apache.cassandra.harry.ddl.SchemaSpec; +import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder; +import org.apache.cassandra.harry.gen.DataGenerators; +import org.apache.cassandra.harry.gen.EntropySource; +import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource; +import org.apache.cassandra.harry.model.AgainstSutChecker; +import org.apache.cassandra.harry.model.Model; +import org.apache.cassandra.harry.model.SelectHelper; +import org.apache.cassandra.harry.operations.CompiledStatement; +import org.apache.cassandra.harry.operations.FilteringQuery; +import org.apache.cassandra.harry.operations.Query; +import org.apache.cassandra.harry.operations.Relation; +import org.apache.cassandra.harry.sut.DoubleWritingSut; +import org.apache.cassandra.harry.sut.QueryModifyingSut; +import org.apache.cassandra.harry.sut.SystemUnderTest; +import org.apache.cassandra.harry.sut.TokenPlacementModel; +import org.apache.cassandra.harry.tracker.DataTracker; +import org.apache.cassandra.harry.tracker.DefaultDataTracker; + +public class StaticsTortureTest extends IntegrationTestBase +{ + private static final int MAX_PARTITION_SIZE = 10_000; + private static final int NUM_PARTITIONS = 100; + private static final int UNIQUE_CELL_VALUES = 5; + + long seed = 1; + + @Test + public void staticsTortureTest() + { + CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6); + staticsTortureTest(Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.asciiType(4, 100)), + ColumnSpec.ck("ck2", ColumnSpec.asciiType), + ColumnSpec.ck("ck3", ColumnSpec.int64Type))); + + for (boolean b1 : new boolean[]{ true, false }) + for (boolean b2 : new boolean[]{ true, false }) + for (boolean b3 : new boolean[]{ true, false }) + { + staticsTortureTest(Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.asciiType(4, 100), b1), + ColumnSpec.ck("ck2", ColumnSpec.asciiType, b2), + ColumnSpec.ck("ck3", ColumnSpec.int64Type, b3))); + } + } + + public void staticsTortureTest(List<ColumnSpec<?>> cks) + { + SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl" + (seed++), + Arrays.asList(ColumnSpec.ck("pk1", ColumnSpec.int64Type), + ColumnSpec.ck("pk2", ColumnSpec.asciiType(4, 100)), + ColumnSpec.ck("pk3", ColumnSpec.int64Type)), + cks, + Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)), + ColumnSpec.regularColumn("v2", ColumnSpec.int64Type), + ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)), + List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100)), + ColumnSpec.staticColumn("s2", ColumnSpec.int64Type), + ColumnSpec.staticColumn("s3", ColumnSpec.asciiType(40, 100)) + )); + + sut.schemaChange(schema.compile().cql()); + SchemaSpec debugSchema = schema.cloneWithName(schema.keyspace, schema.table + "_debug"); + sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table + "_debug").compile().cql()); + sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s (%s) USING 'sai' " + + "WITH OPTIONS = {'case_sensitive': 'false', 'normalize': 'true', 'ascii': 'true'};", + schema.table, + schema.regularColumns.get(0).name, + schema.keyspace, + schema.table, + schema.regularColumns.get(0).name)); + sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s (%s) USING 'sai';", + schema.table, + schema.regularColumns.get(1).name, + schema.keyspace, + schema.table, + schema.regularColumns.get(1).name)); + sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s (%s) USING 'sai';", + schema.table, + schema.regularColumns.get(2).name, + schema.keyspace, + schema.table, + schema.regularColumns.get(2).name)); + sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s (%s) USING 'sai';", + schema.table, + schema.staticColumns.get(0).name, + schema.keyspace, + schema.table, + schema.staticColumns.get(0).name)); + sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s (%s) USING 'sai';", + schema.table, + schema.staticColumns.get(1).name, + schema.keyspace, + schema.table, + schema.staticColumns.get(1).name)); + sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s (%s) USING 'sai';", + schema.table, + schema.staticColumns.get(2).name, + schema.keyspace, + schema.table, + schema.staticColumns.get(2).name)); + DataTracker tracker = new DefaultDataTracker(); + TokenPlacementModel.ReplicationFactor rf = new TokenPlacementModel.SimpleReplicationFactor(cluster.size()); + ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed, + MAX_PARTITION_SIZE, + 100, + tracker, + new DoubleWritingSut(sut, + new QueryModifyingSut(sut, + schema.keyspace + "." + schema.table, + debugSchema.keyspace + "." + debugSchema.table)), + schema, + rf, + SystemUnderTest.ConsistencyLevel.QUORUM); + + EntropySource rng = new JdkRandomEntropySource(1l); + long[] values = new long[UNIQUE_CELL_VALUES]; + for (int i = 0; i < values.length; i++) + values[i] = rng.next(); + + for (int i = 0; i < NUM_PARTITIONS; i++) + { + history.visitPartition(i) + .insert(1, + new long[]{ rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)], + rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)], + rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)] + }, + new long[]{ rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)], + rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)], + rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)] + }); + history.visitPartition(i) + .insert(5, + new long[]{ rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)], + rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)], + rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)] + }, + new long[]{ rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)], + rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)], + rng.nextBoolean() ? DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)] + }); + + if (rng.nextFloat() > 0.9f) + { + history.visitPartition(i) + .deleteRowRange(rng.nextInt(5), rng.nextInt(5), rng.nextBoolean(), rng.nextBoolean()); + } + + if (rng.nextFloat() > 0.9f) + { + history.visitPartition(i) + .deleteColumns(); + } + + if (i % 50 == 0) + cluster.get(1).nodetool("flush", schema.keyspace, schema.table); + } + + Model model = new AgainstSutChecker(tracker, history.clock(), sut, schema, schema.cloneWithName(schema.keyspace, debugSchema.table)) { + @Override + protected List<ResultSetRow> executeOnDebugSchema(Query query) + { + CompiledStatement s2 = query.toSelectStatement(doubleWriteTable.allColumnsSet, true) + .withSchema(schema.keyspace, schema.table, doubleWriteTable.keyspace, doubleWriteTable.table) + .withFiltering(); + return SelectHelper.execute(sut, clock, s2, schema); + } + }; + + for (int pdx = 0; pdx < NUM_PARTITIONS; pdx++) + { + long pd = history.presetSelector.pdAtPosition(pdx); + history.presetSelector.pdAtPosition(1); + for (int i1 = 0; i1 < values.length; i1++) + for (int i2 = 0; i2 < values.length; i2++) + for (int i3 = 0; i3 < values.length; i3++) + { + long[] descriptors = new long[]{ values[i1], values[i2], values[i3], + values[i1], values[i2], values[i3] }; + List<Relation> relations = new ArrayList<>(); + Stream.concat(schema.regularColumns.stream(), + schema.staticColumns.stream()) + .forEach(new Consumer<>() + { + int counter = 0; + + @Override + public void accept(ColumnSpec<?> column) + { + if (rng.nextBoolean()) + return; + + if (column.type.toString().equals(ColumnSpec.int64Type.toString())) + { + if (rng.nextBoolean()) + { + relations.add(Relation.relation(Relation.RelationKind.EQ, + column, + descriptors[counter])); + } + else + { + Relation.relation(rng.nextBoolean() ? Relation.RelationKind.LT : Relation.RelationKind.GT, + column, + descriptors[counter]); + } + } + else + { + Relation.relation(Relation.RelationKind.EQ, + column, + descriptors[counter]); + } + + counter++; + } + }); + + // Without partition key + model.validate(new FilteringQuery(-1, false, relations, schema) + { + @Override + public CompiledStatement toSelectStatement() + { + return SelectHelper.select(schemaSpec, null, schemaSpec.allColumnsSet, relations, reverse, true); + } + }); + model.validate(new FilteringQuery(pd, false, relations, schema)); + } + } + } +} \ No newline at end of file diff --git a/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java b/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java index 2ee0c50add..4ad7f29369 100644 --- a/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java +++ b/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java @@ -31,13 +31,15 @@ public class SchemaSpec { public interface SchemaSpecFactory { - public SchemaSpec make(long seed, SystemUnderTest sut); + SchemaSpec make(long seed, SystemUnderTest sut); } public final DataGenerators.KeyGenerator pkGenerator; public final DataGenerators.KeyGenerator ckGenerator; private final boolean isCompactStorage; + private final boolean disableReadRepair; + private final String compactionStrategy; public final boolean trackLts; // These fields are immutable, and are safe as public @@ -65,23 +67,26 @@ public class SchemaSpec List<ColumnSpec<?>> regularColumns, List<ColumnSpec<?>> staticColumns) { - this(keyspace, table, partitionKeys, clusteringKeys, regularColumns, staticColumns, false, false); + this(keyspace, table, partitionKeys, clusteringKeys, regularColumns, staticColumns, false, false, null, false); } public SchemaSpec cloneWithName(String ks, String table) { - return new SchemaSpec(ks, table, partitionKeys, clusteringKeys, regularColumns, staticColumns, isCompactStorage, trackLts); + return new SchemaSpec(ks, table, partitionKeys, clusteringKeys, regularColumns, staticColumns, + isCompactStorage, disableReadRepair, compactionStrategy, trackLts); } public SchemaSpec trackLts() { - return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, regularColumns, staticColumns, isCompactStorage, true); + return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, regularColumns, staticColumns, + isCompactStorage, true, compactionStrategy, disableReadRepair); } public SchemaSpec withCompactStorage() { - return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, regularColumns, staticColumns, true, trackLts); + return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, regularColumns, + staticColumns, true, disableReadRepair, compactionStrategy, trackLts); } public SchemaSpec(String keyspace, @@ -91,6 +96,8 @@ public class SchemaSpec List<ColumnSpec<?>> regularColumns, List<ColumnSpec<?>> staticColumns, boolean isCompactStorage, + boolean disableReadRepair, + String compactionStrategy, boolean trackLts) { assert !isCompactStorage || clusteringKeys.size() == 0 || regularColumns.size() <= 1; @@ -98,6 +105,8 @@ public class SchemaSpec this.keyspace = keyspace; this.table = table; this.isCompactStorage = isCompactStorage; + this.disableReadRepair = disableReadRepair; + this.compactionStrategy = compactionStrategy; this.partitionKeys = Collections.unmodifiableList(new ArrayList<>(partitionKeys)); for (int i = 0; i < partitionKeys.size(); i++) @@ -296,12 +305,24 @@ public class SchemaSpec sb.append(')'); - Runnable appendWith = doOnce(() -> sb.append(" WITH ")); + Runnable appendWith = doOnce(() -> sb.append(" WITH")); if (isCompactStorage) { appendWith.run(); - sb.append("COMPACT STORAGE AND"); + sb.append(" COMPACT STORAGE AND"); + } + + if (disableReadRepair) + { + appendWith.run(); + sb.append(" read_repair = 'NONE' AND"); + } + + if (compactionStrategy != null) + { + appendWith.run(); + sb.append(" compaction = {'class': '").append(compactionStrategy).append("'} AND"); } if (clusteringKeys.size() > 0) diff --git a/test/harry/main/org/apache/cassandra/harry/model/AgainstSutChecker.java b/test/harry/main/org/apache/cassandra/harry/model/AgainstSutChecker.java index a9c35337df..e4bd009c8c 100644 --- a/test/harry/main/org/apache/cassandra/harry/model/AgainstSutChecker.java +++ b/test/harry/main/org/apache/cassandra/harry/model/AgainstSutChecker.java @@ -50,11 +50,11 @@ import org.apache.cassandra.harry.tracker.DataTracker; */ public class AgainstSutChecker implements Model { - private final OpSelectors.Clock clock; - private final SystemUnderTest sut; - private final SchemaSpec schema; - private final SchemaSpec doubleWriteTable; - private final DataTracker tracker; + protected final OpSelectors.Clock clock; + protected final SystemUnderTest sut; + protected final SchemaSpec schema; + protected final SchemaSpec doubleWriteTable; + protected final DataTracker tracker; public AgainstSutChecker(DataTracker tracker, OpSelectors.Clock clock, @@ -72,14 +72,12 @@ public class AgainstSutChecker implements Model public void validate(Query query) { tracker.beginValidation(query.pd); - CompiledStatement s1 = query.toSelectStatement(schema.allColumnsSet, true); - CompiledStatement s2 = s1.withSchema(schema.keyspace, schema.table, - doubleWriteTable.keyspace, doubleWriteTable.table); - List<ResultSetRow> rows1 = SelectHelper.execute(sut, clock, s1, schema); - List<ResultSetRow> rows2 = SelectHelper.execute(sut, clock, s2, doubleWriteTable); + + List<ResultSetRow> rows1 = executeOnMainSchema(query); + List<ResultSetRow> rows2 = executeOnDebugSchema(query); if (rows1.size() != rows2.size()) - throw new IllegalStateException(String.format("Sizes do not match %d %d", rows1.size(), rows2.size())); + throw new IllegalStateException(String.format("Sizes do not match %d %d\n%s\n%s\nQuery:%s\n", rows1.size(), rows2.size(), rows1, rows2, query.toSelectStatement())); for (int i = 0; i < rows1.size(); i++) { @@ -95,5 +93,17 @@ public class AgainstSutChecker implements Model tracker.endValidation(query.pd); } + protected final List<ResultSetRow> executeOnMainSchema(Query query) + { + CompiledStatement s1 = query.toSelectStatement(schema.allColumnsSet, true); + return SelectHelper.execute(sut, clock, s1, schema); + } + + protected List<ResultSetRow> executeOnDebugSchema(Query query) + { + CompiledStatement s2 = query.toSelectStatement(doubleWriteTable.allColumnsSet, true) + .withSchema(schema.keyspace, schema.table, doubleWriteTable.keyspace, doubleWriteTable.table); + return SelectHelper.execute(sut, clock, s2, schema); + } } diff --git a/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java b/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java index 36e4499c7d..8f2c6f5a4b 100644 --- a/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java +++ b/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java @@ -59,7 +59,7 @@ public class SelectHelper return select(schema, pd, null, relations, reverse, includeWriteTime); } - public static CompiledStatement select(SchemaSpec schema, long pd, Set<ColumnSpec<?>> columns, List<Relation> relations, boolean reverse, boolean includeWriteTime) + public static CompiledStatement select(SchemaSpec schema, Long pd, Set<ColumnSpec<?>> columns, List<Relation> relations, boolean reverse, boolean includeWriteTime) { boolean isWildcardQuery = columns == null; if (isWildcardQuery) @@ -126,21 +126,28 @@ public class SelectHelper List<Object> bindings = new ArrayList<>(); - schema.inflateRelations(pd, - relations, - new SchemaSpec.AddRelationCallback() - { - boolean isFirst = true; - public void accept(ColumnSpec<?> spec, Relation.RelationKind kind, Object value) - { - if (isFirst) - isFirst = false; - else - b.append(" AND "); - b.append(kind.getClause(spec)); - bindings.add(value); - } - }); + SchemaSpec.AddRelationCallback consumer = new SchemaSpec.AddRelationCallback() + { + boolean isFirst = true; + public void accept(ColumnSpec<?> spec, Relation.RelationKind kind, Object value) + { + if (isFirst) + isFirst = false; + else + b.append(" AND "); + b.append(kind.getClause(spec)); + bindings.add(value); + } + }; + if (pd != null) + { + Object[] pk = schema.inflatePartitionKey(pd); + for (int i = 0; i < pk.length; i++) + consumer.accept(schema.partitionKeys.get(i), Relation.RelationKind.EQ, pk[i]); + + } + schema.inflateRelations(relations, consumer); + addOrderBy(schema, b, reverse); b.append(";"); Object[] bindingsArr = bindings.toArray(new Object[bindings.size()]); diff --git a/test/harry/main/org/apache/cassandra/harry/model/reconciler/PartitionState.java b/test/harry/main/org/apache/cassandra/harry/model/reconciler/PartitionState.java index 9d789cdd8c..4792de638d 100644 --- a/test/harry/main/org/apache/cassandra/harry/model/reconciler/PartitionState.java +++ b/test/harry/main/org/apache/cassandra/harry/model/reconciler/PartitionState.java @@ -101,7 +101,6 @@ public class PartitionState implements Iterable<Reconciler.RowState> rows.put(cd, rowState); } PartitionState ps = new PartitionState(pd, debugCd, staticRow, rows, schema); - System.out.println("ps.rows.size() = " + ps.rows.size()); return ps; } diff --git a/test/harry/main/org/apache/cassandra/harry/operations/CompiledStatement.java b/test/harry/main/org/apache/cassandra/harry/operations/CompiledStatement.java index dc29bb1cbb..3d1428f782 100644 --- a/test/harry/main/org/apache/cassandra/harry/operations/CompiledStatement.java +++ b/test/harry/main/org/apache/cassandra/harry/operations/CompiledStatement.java @@ -41,6 +41,13 @@ public class CompiledStatement bindings); } + public CompiledStatement withFiltering() + { + return new CompiledStatement(cql.replace(";", + " ALLOW FILTERING;"), + bindings); + } + public Object[] bindings() { return bindings; diff --git a/test/harry/main/org/apache/cassandra/harry/operations/FilteringQuery.java b/test/harry/main/org/apache/cassandra/harry/operations/FilteringQuery.java index e698025901..1a39e50fa8 100644 --- a/test/harry/main/org/apache/cassandra/harry/operations/FilteringQuery.java +++ b/test/harry/main/org/apache/cassandra/harry/operations/FilteringQuery.java @@ -65,4 +65,10 @@ public class FilteringQuery extends Query { throw new IllegalStateException("not implemented for filtering query"); } + + @Override + public String toString() + { + return "FilteringQuery{pd=" + pd + ", reverse=" + reverse + ", relations=" + relations + '}'; + } } diff --git a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java index d024688253..3d142e6c7d 100644 --- a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java +++ b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java @@ -129,12 +129,17 @@ public class InJvmSutBase<NODE extends IInstance, CLUSTER extends ICluster<NODE> cluster.schemaChange(statement); } + public Object[][] execute(String statement, ConsistencyLevel cl, int pageSize, Object... bindings) + { + return execute(statement, cl, loadBalancingStrategy.get(), pageSize, bindings); + } + public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings) { - return execute(statement, cl, loadBalancingStrategy.get(), bindings); + return execute(statement, cl, loadBalancingStrategy.get(), 1, bindings); } - public Object[][] execute(String statement, ConsistencyLevel cl, int coordinator, Object... bindings) + public Object[][] execute(String statement, ConsistencyLevel cl, int coordinator, int pageSize, Object... bindings) { if (isShutdown.get()) throw new RuntimeException("Instance is shut down"); @@ -151,7 +156,7 @@ public class InJvmSutBase<NODE extends IInstance, CLUSTER extends ICluster<NODE> return Iterators.toArray(cluster // round-robin .coordinator(coordinator) - .executeWithPaging(statement, toApiCl(cl), 1, bindings), + .executeWithPaging(statement, toApiCl(cl), pageSize, bindings), Object[].class); } else --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org