maedhroz commented on code in PR #3689:
URL: https://github.com/apache/cassandra/pull/3689#discussion_r1846949322


##########
test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java:
##########
@@ -18,302 +18,192 @@
 
 package org.apache.cassandra.fuzz.sai;
 
+
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
 
+import com.google.common.collect.Streams;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
-import org.apache.cassandra.harry.ddl.ColumnSpec;
-import org.apache.cassandra.harry.ddl.SchemaSpec;
-import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
-import org.apache.cassandra.harry.gen.DataGenerators;
-import org.apache.cassandra.harry.gen.EntropySource;
-import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
-import org.apache.cassandra.harry.model.QuiescentChecker;
-import org.apache.cassandra.harry.model.SelectHelper;
-import org.apache.cassandra.harry.model.reconciler.PartitionState;
-import org.apache.cassandra.harry.model.reconciler.Reconciler;
-import org.apache.cassandra.harry.operations.FilteringQuery;
-import org.apache.cassandra.harry.operations.Query;
-import org.apache.cassandra.harry.operations.Relation;
-import org.apache.cassandra.harry.sut.SystemUnderTest;
-import org.apache.cassandra.harry.sut.TokenPlacementModel;
-import org.apache.cassandra.harry.sut.injvm.InJvmSut;
-import org.apache.cassandra.harry.sut.injvm.InJvmSutBase;
-import org.apache.cassandra.harry.tracker.DataTracker;
-import org.apache.cassandra.harry.tracker.DefaultDataTracker;
-import org.apache.cassandra.service.consensus.TransactionalMode;
-
-import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-public abstract class SingleNodeSAITestBase extends IntegrationTestBase
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.harry.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.dsl.HistoryBuilderHelper;
+import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.Generators;
+import org.apache.cassandra.harry.gen.SchemaGenerators;
+
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+import static 
org.apache.cassandra.harry.dsl.SingleOperationBuilder.IdxRelation;
+
+// TODO: "WITH OPTIONS = {'case_sensitive': 'false', 'normalize': 'true', 
'ascii': 'true'};",
+public abstract class SingleNodeSAITestBase extends TestBaseImpl
 {
-    private static final int RUNS = 1;
-
     private static final int OPERATIONS_PER_RUN = 30_000;
     private static final int REPAIR_SKIP = OPERATIONS_PER_RUN / 2;
     private static final int FLUSH_SKIP = OPERATIONS_PER_RUN / 7;
-    private static final int VALIDATION_SKIP = OPERATIONS_PER_RUN / 100;
 
     private static final int NUM_PARTITIONS = OPERATIONS_PER_RUN / 1000;
     protected static final int MAX_PARTITION_SIZE = 10_000;
     private static final int UNIQUE_CELL_VALUES = 5;
 
-    long seed = 1;
+    protected static final Logger logger = 
LoggerFactory.getLogger(SingleNodeSAITest.class);
+    protected static Cluster cluster;
 
-    protected boolean withAccord;
+    protected final boolean withAccord;
+
+    protected SingleNodeSAITestBase(boolean withAccord)
+    {
+        this.withAccord = withAccord;
+    }
 
     @BeforeClass
     public static void before() throws Throwable
+    {
+        init(1, defaultConfig());
+    }
+
+    protected static void init(int nodes, Consumer<IInstanceConfig> cfg) 
throws Throwable
     {
         cluster = Cluster.build()
-                         .withNodes(1)
-                         // At lower fetch sizes, queries w/ hundreds or 
thousands of matches can take a very long time.
-                         .withConfig(InJvmSutBase.defaultConfig().andThen(c -> 
c.set("range_request_timeout", "180s")
-                                                                               
 .set("read_request_timeout", "180s")
-                                                                               
 .set("transaction_timeout", "180s")
-                                                                               
 .set("write_request_timeout", "180s")
-                                                                               
 .set("native_transport_timeout", "180s")
-                                                                               
 .set("slow_query_log_timeout", "180s")
-                                                                               
 .with(GOSSIP).with(NETWORK)))
+                         .withNodes(nodes)
+                         .withConfig(cfg)
                          .createWithoutStarting();
         cluster.setUncaughtExceptionsFilter(t -> {
             logger.error("Caught exception, reporting during shutdown. 
Ignoring.", t);
             return true;
         });
         cluster.startup();
         cluster = init(cluster);
-        sut = new InJvmSut(cluster);
+    }
+    @AfterClass
+    public static void afterClass()
+    {
+        cluster.close();
     }
 
-    public SingleNodeSAITestBase(boolean withAccord)
+    @Before
+    public void beforeEach()
     {
-        this.withAccord = withAccord;
+        cluster.schemaChange("DROP KEYSPACE IF EXISTS harry");
+        cluster.schemaChange("CREATE KEYSPACE harry WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': 1};");
     }
 
     @Test
     public void basicSaiTest()
     {
-        CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6);
-        SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl1",
-                                           Arrays.asList(ColumnSpec.ck("pk1", 
ColumnSpec.int64Type),
-                                                         ColumnSpec.ck("pk2", 
ColumnSpec.asciiType(4, 100)),
-                                                         ColumnSpec.ck("pk3", 
ColumnSpec.int64Type)),
-                                           Arrays.asList(ColumnSpec.ck("ck1", 
ColumnSpec.asciiType(4, 100)),
-                                                         ColumnSpec.ck("ck2", 
ColumnSpec.asciiType, true),
-                                                         ColumnSpec.ck("ck3", 
ColumnSpec.int64Type)),
-                                           
Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)),
-                                                         
ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
-                                                         
ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)),
-                                           
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))),
-                                           withAccord ? 
Optional.of(TransactionalMode.full) : Optional.empty())
-                            .withWriteTimeFromAccord(false) // use the harry 
timestamp
-                            
.withCompactionStrategy("LeveledCompactionStrategy");
-
-        sut.schemaChange(schema.compile().cql());
-        sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table + 
"_debug").compile().cql());
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai' ",
-                                       schema.regularColumns.get(0).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(0).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.regularColumns.get(1).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(1).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.regularColumns.get(2).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(2).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.staticColumns.get(0).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.staticColumns.get(0).name));
-
-        waitForIndexesQueryable(schema);
-
-        DataTracker tracker = new DefaultDataTracker();
-        TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(cluster.size());
-        ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed,
-                                                                      
MAX_PARTITION_SIZE,
-                                                                      
MAX_PARTITION_SIZE,
-                                                                      tracker,
-                                                                      sut,
-                                                                      schema,
-                                                                      rf,
-                                                                      
SystemUnderTest.ConsistencyLevel.QUORUM);
-
-        for (int run = 0; run < RUNS; run++)
-        {
-            logger.info("Starting run {}/{}...", run + 1, RUNS);
-            EntropySource random = new JdkRandomEntropySource(run);
-
-            // Populate the array of possible values for all operations in the 
run:
-            long[] values = new long[UNIQUE_CELL_VALUES];
-            for (int i = 0; i < values.length; i++)
-                values[i] = random.next();
+        Generator<SchemaSpec> schemaGen;
+        if (withAccord)
+            schemaGen = SchemaGenerators.schemaSpecGen(KEYSPACE, "basic_sai", 
MAX_PARTITION_SIZE, SchemaSpec.Options.TRANSACTIONAL_MODE, "full");
+        else
+            schemaGen = SchemaGenerators.schemaSpecGen(KEYSPACE, "basic_sai", 
MAX_PARTITION_SIZE);
+
+        Set<Integer> usedPartitions = new HashSet<>();
+
+        Generator<Integer> globalPkGen = Generators.pick(0, NUM_PARTITIONS);
+        Generator<Integer> ckGen = Generators.int32(0, MAX_PARTITION_SIZE);
+
+        withRandom(rng -> {
+            SchemaSpec schema = schemaGen.generate(rng);
+            
CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(100);
+            beforeEach();
+            cluster.get(1).nodetool("disableautocompaction");
+            cluster.schemaChange(schema.compile());
+            Streams.concat(schema.clusteringKeys.stream(),
+                           schema.regularColumns.stream(),
+                           schema.staticColumns.stream())
+                   .forEach(column -> {
+                       cluster.schemaChange(String.format("CREATE INDEX 
%s_sai_idx ON %s.%s (%s) USING 'sai' ",
+                                                          column.name,
+                                                          schema.keyspace,
+                                                          schema.table,
+                                                          column.name));
+                   });
+
+            waitForIndexesQueryable(schema);
+
+            HistoryBuilder history = new 
HistoryBuilder(schema.valueGenerators);
+            List<Integer> partitions = new ArrayList<>();
+            for (int j = 0; j < 5; j++)
+            {
+                int picked = globalPkGen.generate(rng);
+                if (usedPartitions.contains(picked))
+                    continue;
+                partitions.add(picked);
+            }
+            usedPartitions.addAll(partitions);
+            if (partitions.isEmpty())
+                return;
 
+            Generator<Integer> pkGen = Generators.pick(partitions);
             for (int i = 0; i < OPERATIONS_PER_RUN; i++)
             {
-                int partitionIndex = random.nextInt(0, NUM_PARTITIONS);
-
-                history.visitPartition(partitionIndex)
-                       .insert(random.nextInt(MAX_PARTITION_SIZE),
-                               new long[] { random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)],
-                                            random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)],
-                                            random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] },
-                               new long[] { random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] });
+                int partitionIndex = pkGen.generate(rng);
+                HistoryBuilderHelper.insertRandomData(schema, partitionIndex, 
ckGen.generate(rng), rng,0.5d, history);
 
-                if (random.nextFloat() > 0.99f)
-                {
-                    int row1 = random.nextInt(MAX_PARTITION_SIZE);
-                    int row2 = random.nextInt(MAX_PARTITION_SIZE);
-                    
history.visitPartition(partitionIndex).deleteRowRange(Math.min(row1, row2), 
Math.max(row1, row2),
-                                                                          
random.nextBoolean(), random.nextBoolean());
-                }
-                else if (random.nextFloat() > 0.999f)
+                if (rng.nextFloat() > 0.99f)
                 {
-                    history.visitPartition(partitionIndex).deleteRowSlice();
+                    int row1 = ckGen.generate(rng);
+                    int row2 = ckGen.generate(rng);
+                    history.deleteRowRange(partitionIndex,
+                                           Math.min(row1, row2),
+                                           Math.max(row1, row2),
+                                           
rng.nextInt(schema.clusteringKeys.size()),
+                                           rng.nextBoolean(),
+                                           rng.nextBoolean());
                 }
 
-                if (random.nextFloat() > 0.995f)
-                {
-                    history.visitPartition(partitionIndex).deleteColumns();
-                }
+                if (rng.nextFloat() > 0.995f)
+                    HistoryBuilderHelper.deleteRandomColumns(schema, 
partitionIndex, ckGen.generate(rng), rng, history);
 
-                if (random.nextFloat() > 0.9995f)
-                {
-                    history.visitPartition(partitionIndex).deletePartition();
-                }
+                if (rng.nextFloat() > 0.9995f)
+                    history.deletePartition(partitionIndex);
 
                 if (i % REPAIR_SKIP == 0)
-                {
-                    logger.debug("Repairing/flushing after operation {}...", 
i);
-                    repair(schema);
-                }
+                    history.custom(() -> repair(schema), "Repair");
                 else if (i % FLUSH_SKIP == 0)
-                {
-                    logger.debug("Flushing after operation {}...", i);
-                    flush(schema);
-                }
-
-                if (i % VALIDATION_SKIP != 0)
-                    continue;
+                    history.custom(() -> flush(schema), "Flush");
 
-                logger.debug("Validating partition at index {} after operation 
{} in run {}...", partitionIndex, i, run + 1);
-
-                for (int j = 0; j < 10; j++)
+                if (i > 0 && i % 1000 == 0)
                 {
-                    List<Relation> relations = new ArrayList<>();
-
-                    // For one text column and 2 numeric columns, we can use 
between 1 and 5 total relations.
-                    int num = random.nextInt(1, 5);
-
-                    List<List<Relation.RelationKind>> pick = new ArrayList<>();
-                    //noinspection ArraysAsListWithZeroOrOneArgument
-                    pick.add(new 
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ))); // text column supports 
only EQ
-                    pick.add(new 
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT, 
Relation.RelationKind.LT)));
-                    pick.add(new 
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT, 
Relation.RelationKind.LT)));
-
-                    if (random.nextFloat() > 0.75f)
-                    {
-                        relations.addAll(Query.clusteringSliceQuery(schema,
-                                                                    
partitionIndex,
-                                                                    
random.next(),
-                                                                    
random.next(),
-                                                                    
random.nextBoolean(),
-                                                                    
random.nextBoolean(),
-                                                                    
false).relations);
-                    }
-
-                    for (int k = 0; k < num; k++)
+                    for (int j = 0; j < 5; j++)
                     {
-                        int column = 
random.nextInt(schema.regularColumns.size());
-                        Relation.RelationKind relationKind = pickKind(random, 
pick, column);
-
-                        if (relationKind != null)
-                            relations.add(Relation.relation(relationKind,
-                                                            
schema.regularColumns.get(column),
-                                                            
values[random.nextInt(values.length)]));
-                    }
-
-                    if (random.nextFloat() > 0.7f)
-                    {
-                        
relations.add(Relation.relation(Relation.RelationKind.EQ,
-                                                        
schema.staticColumns.get(0),
-                                                        
values[random.nextInt(values.length)]));
-                    }
-
-                    long pd = 
history.pdSelector().pdAtPosition(partitionIndex);
-                    FilteringQuery query = new FilteringQuery(pd, false, 
relations, schema);
-                    Reconciler reconciler = new 
Reconciler(history.pdSelector(), schema, history::visitor);
-                    Set<ColumnSpec<?>> columns = new 
HashSet<>(schema.allColumns);
-
-                    PartitionState modelState = 
reconciler.inflatePartitionState(pd, tracker, query).filter(query);
-
-                    if (modelState.rows().size() > 0)
-                        logger.debug("Model contains {} matching rows for 
query {}.", modelState.rows().size(), query);
-
-                    try
-                    {
-                        QuiescentChecker.validate(schema,
-                                                  tracker,
-                                                  columns,
-                                                  modelState,
-                                                  SelectHelper.execute(sut, 
history.clock(), query),
-                                                  query);
-
-                        // Run the query again to see if the first execution 
caused an issue via read-repair:
-                        QuiescentChecker.validate(schema,
-                                                  tracker,
-                                                  columns,
-                                                  modelState,
-                                                  SelectHelper.execute(sut, 
history.clock(), query),
-                                                  query);
-                    }
-                    catch (Throwable t)
-                    {
-                        logger.debug("Partition index = {}, run = {}, j = {}, 
i = {}", partitionIndex, run, j, i);
-
-                        Query partitionQuery = Query.selectAllColumns(schema, 
pd, false);
-                        QuiescentChecker.validate(schema,
-                                                  tracker,
-                                                  columns,
-                                                  
reconciler.inflatePartitionState(pd, tracker, partitionQuery),
-                                                  SelectHelper.execute(sut, 
history.clock(), partitionQuery),
-                                                  partitionQuery);
-                        logger.debug("Partition state agrees. Throwing 
original error...");
-
-                        throw t;
+                        List<IdxRelation> regularRelations = 
HistoryBuilderHelper.generateValueRelations(rng, schema.regularColumns.size(),
+                                                                               
                          column -> 
Math.min(schema.valueGenerators.regularPopulation(column), UNIQUE_CELL_VALUES));
+                        List<IdxRelation> staticRelations = 
HistoryBuilderHelper.generateValueRelations(rng, schema.staticColumns.size(),
+                                                                               
                         column -> 
Math.min(schema.valueGenerators.staticPopulation(column), UNIQUE_CELL_VALUES));
+                        history.select(pkGen.generate(rng),
+                                       
HistoryBuilderHelper.generateClusteringRelations(rng, 
schema.clusteringKeys.size(), ckGen).toArray(new IdxRelation[0]),
+                                       regularRelations.toArray(new 
IdxRelation[regularRelations.size()]),
+                                       staticRelations.toArray(new 
IdxRelation[staticRelations.size()]));
                     }
                 }
             }
 
-            if (run + 1 < RUNS)
-            {
-                logger.debug("Forcing compaction at the end of run {}...", run 
+ 1);
-                compact(schema);
-            }
-        }
+            InJvmDTestVisitExecutor.replay(InJvmDTestVisitExecutor.builder()
+                                                                  
.pageSizeSelector(pageSizeSelector())
+                                                                  
.build(schema, history, cluster),
+                                           history);
+        });
     }
 
     protected void flush(SchemaSpec schema)
     {
         cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
     }
-    
+
     protected void compact(SchemaSpec schema)

Review Comment:
   `compact()` is now unused. We might need to compact every once in a while to 
keep the number of SSTables per partition under 32 to avoid hitting 
`sai_sstable_indexes_per_query_warn_threshold`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to