ifesdjeen commented on code in PR #3689:
URL: https://github.com/apache/cassandra/pull/3689#discussion_r1847860002


##########
test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java:
##########
@@ -18,302 +18,198 @@
 
 package org.apache.cassandra.fuzz.sai;
 
+
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
 
+import com.google.common.collect.Streams;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
-import org.apache.cassandra.harry.ddl.ColumnSpec;
-import org.apache.cassandra.harry.ddl.SchemaSpec;
-import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
-import org.apache.cassandra.harry.gen.DataGenerators;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.harry.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.dsl.HistoryBuilderHelper;
+import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
 import org.apache.cassandra.harry.gen.EntropySource;
-import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
-import org.apache.cassandra.harry.model.QuiescentChecker;
-import org.apache.cassandra.harry.model.SelectHelper;
-import org.apache.cassandra.harry.model.reconciler.PartitionState;
-import org.apache.cassandra.harry.model.reconciler.Reconciler;
-import org.apache.cassandra.harry.operations.FilteringQuery;
-import org.apache.cassandra.harry.operations.Query;
-import org.apache.cassandra.harry.operations.Relation;
-import org.apache.cassandra.harry.sut.SystemUnderTest;
-import org.apache.cassandra.harry.sut.TokenPlacementModel;
-import org.apache.cassandra.harry.sut.injvm.InJvmSut;
-import org.apache.cassandra.harry.sut.injvm.InJvmSutBase;
-import org.apache.cassandra.harry.tracker.DataTracker;
-import org.apache.cassandra.harry.tracker.DefaultDataTracker;
-import org.apache.cassandra.service.consensus.TransactionalMode;
-
-import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-public abstract class SingleNodeSAITestBase extends IntegrationTestBase
-{
-    private static final int RUNS = 1;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.Generators;
+import org.apache.cassandra.harry.gen.SchemaGenerators;
 
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+import static 
org.apache.cassandra.harry.dsl.SingleOperationBuilder.IdxRelation;
+
+// TODO: "WITH OPTIONS = {'case_sensitive': 'false', 'normalize': 'true', 
'ascii': 'true'};",
+public abstract class SingleNodeSAITestBase extends TestBaseImpl
+{
     private static final int OPERATIONS_PER_RUN = 30_000;
     private static final int REPAIR_SKIP = OPERATIONS_PER_RUN / 2;
     private static final int FLUSH_SKIP = OPERATIONS_PER_RUN / 7;
-    private static final int VALIDATION_SKIP = OPERATIONS_PER_RUN / 100;
+    private static final int COMPACTION_SKIP = OPERATIONS_PER_RUN / 10;
 
     private static final int NUM_PARTITIONS = OPERATIONS_PER_RUN / 1000;
     protected static final int MAX_PARTITION_SIZE = 10_000;
     private static final int UNIQUE_CELL_VALUES = 5;
 
-    long seed = 1;
+    protected static final Logger logger = 
LoggerFactory.getLogger(SingleNodeSAITest.class);
+    protected static Cluster cluster;
 
-    protected boolean withAccord;
+    protected final boolean withAccord;
+
+    protected SingleNodeSAITestBase(boolean withAccord)
+    {
+        this.withAccord = withAccord;
+    }
 
     @BeforeClass
     public static void before() throws Throwable
+    {
+        init(1, defaultConfig());
+    }
+
+    protected static void init(int nodes, Consumer<IInstanceConfig> cfg) 
throws Throwable
     {
         cluster = Cluster.build()
-                         .withNodes(1)
-                         // At lower fetch sizes, queries w/ hundreds or 
thousands of matches can take a very long time.
-                         .withConfig(InJvmSutBase.defaultConfig().andThen(c -> 
c.set("range_request_timeout", "180s")
-                                                                               
 .set("read_request_timeout", "180s")
-                                                                               
 .set("transaction_timeout", "180s")
-                                                                               
 .set("write_request_timeout", "180s")
-                                                                               
 .set("native_transport_timeout", "180s")
-                                                                               
 .set("slow_query_log_timeout", "180s")
-                                                                               
 .with(GOSSIP).with(NETWORK)))
+                         .withNodes(nodes)
+                         .withConfig(cfg)
                          .createWithoutStarting();
         cluster.setUncaughtExceptionsFilter(t -> {
             logger.error("Caught exception, reporting during shutdown. 
Ignoring.", t);
             return true;
         });
         cluster.startup();
         cluster = init(cluster);
-        sut = new InJvmSut(cluster);
+    }
+    @AfterClass
+    public static void afterClass()
+    {
+        cluster.close();
     }
 
-    public SingleNodeSAITestBase(boolean withAccord)
+    @Before
+    public void beforeEach()
     {
-        this.withAccord = withAccord;
+        cluster.schemaChange("DROP KEYSPACE IF EXISTS harry");
+        cluster.schemaChange("CREATE KEYSPACE harry WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': 1};");
     }
 
     @Test
     public void basicSaiTest()
     {
-        CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6);
-        SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl1",
-                                           Arrays.asList(ColumnSpec.ck("pk1", 
ColumnSpec.int64Type),
-                                                         ColumnSpec.ck("pk2", 
ColumnSpec.asciiType(4, 100)),
-                                                         ColumnSpec.ck("pk3", 
ColumnSpec.int64Type)),
-                                           Arrays.asList(ColumnSpec.ck("ck1", 
ColumnSpec.asciiType(4, 100)),
-                                                         ColumnSpec.ck("ck2", 
ColumnSpec.asciiType, true),
-                                                         ColumnSpec.ck("ck3", 
ColumnSpec.int64Type)),
-                                           
Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)),
-                                                         
ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
-                                                         
ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)),
-                                           
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))),
-                                           withAccord ? 
Optional.of(TransactionalMode.full) : Optional.empty())
-                            .withWriteTimeFromAccord(false) // use the harry 
timestamp
-                            
.withCompactionStrategy("LeveledCompactionStrategy");
-
-        sut.schemaChange(schema.compile().cql());
-        sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table + 
"_debug").compile().cql());
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai' ",
-                                       schema.regularColumns.get(0).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(0).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.regularColumns.get(1).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(1).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.regularColumns.get(2).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(2).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.staticColumns.get(0).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.staticColumns.get(0).name));
-
-        waitForIndexesQueryable(schema);
-
-        DataTracker tracker = new DefaultDataTracker();
-        TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(cluster.size());
-        ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed,
-                                                                      
MAX_PARTITION_SIZE,
-                                                                      
MAX_PARTITION_SIZE,
-                                                                      tracker,
-                                                                      sut,
-                                                                      schema,
-                                                                      rf,
-                                                                      
SystemUnderTest.ConsistencyLevel.QUORUM);
-
-        for (int run = 0; run < RUNS; run++)
-        {
-            logger.info("Starting run {}/{}...", run + 1, RUNS);
-            EntropySource random = new JdkRandomEntropySource(run);
-
-            // Populate the array of possible values for all operations in the 
run:
-            long[] values = new long[UNIQUE_CELL_VALUES];
-            for (int i = 0; i < values.length; i++)
-                values[i] = random.next();
+        Generator<SchemaSpec> schemaGen;
+        if (withAccord)
+            schemaGen = SchemaGenerators.schemaSpecGen(KEYSPACE, "basic_sai", 
MAX_PARTITION_SIZE, SchemaSpec.Options.TRANSACTIONAL_MODE, "full");
+        else
+            schemaGen = SchemaGenerators.schemaSpecGen(KEYSPACE, "basic_sai", 
MAX_PARTITION_SIZE);
+
+        Set<Integer> usedPartitions = new HashSet<>();
+
+        withRandom(rng -> {
+            SchemaSpec schema = schemaGen.generate(rng);
+
+            Generator<Integer> globalPkGen = Generators.int32(0, 
Math.min(NUM_PARTITIONS, schema.valueGenerators.pkPopulation()));
+            Generator<Integer> ckGen = Generators.int32(0, 
schema.valueGenerators.ckPopulation());
+
+            
CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(100);
+            beforeEach();
+            cluster.forEach(i -> i.nodetool("disableautocompaction"));
+
+            cluster.schemaChange(schema.compile());
+            Streams.concat(schema.clusteringKeys.stream(),
+                           schema.regularColumns.stream(),
+                           schema.staticColumns.stream())
+                   .forEach(column -> {
+                       cluster.schemaChange(String.format("CREATE INDEX 
%s_sai_idx ON %s.%s (%s) USING 'sai' ",
+                                                          column.name,
+                                                          schema.keyspace,
+                                                          schema.table,
+                                                          column.name));
+                   });
+
+            waitForIndexesQueryable(schema);
+
+            HistoryBuilder history = new 
HistoryBuilder(schema.valueGenerators);
+            List<Integer> partitions = new ArrayList<>();
+            for (int j = 0; j < 5; j++)
+            {
+                int picked = globalPkGen.generate(rng);
+                if (usedPartitions.contains(picked))
+                    continue;
+                partitions.add(picked);
+            }
+            usedPartitions.addAll(partitions);
+            if (partitions.isEmpty())
+                return;
 
+            Generator<Integer> pkGen = Generators.pick(partitions);
             for (int i = 0; i < OPERATIONS_PER_RUN; i++)
             {
-                int partitionIndex = random.nextInt(0, NUM_PARTITIONS);
+                int partitionIndex = pkGen.generate(rng);
+                HistoryBuilderHelper.insertRandomData(schema, partitionIndex, 
ckGen.generate(rng), rng,0.5d, history);
 
-                history.visitPartition(partitionIndex)
-                       .insert(random.nextInt(MAX_PARTITION_SIZE),
-                               new long[] { random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)],
-                                            random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)],
-                                            random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] },
-                               new long[] { random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] });
-
-                if (random.nextFloat() > 0.99f)
+                if (rng.nextFloat() > 0.99f)
                 {
-                    int row1 = random.nextInt(MAX_PARTITION_SIZE);
-                    int row2 = random.nextInt(MAX_PARTITION_SIZE);
-                    
history.visitPartition(partitionIndex).deleteRowRange(Math.min(row1, row2), 
Math.max(row1, row2),
-                                                                          
random.nextBoolean(), random.nextBoolean());
-                }
-                else if (random.nextFloat() > 0.999f)
-                {
-                    history.visitPartition(partitionIndex).deleteRowSlice();
+                    int row1 = ckGen.generate(rng);
+                    int row2 = ckGen.generate(rng);
+                    history.deleteRowRange(partitionIndex,
+                                           Math.min(row1, row2),
+                                           Math.max(row1, row2),
+                                           
rng.nextInt(schema.clusteringKeys.size()),
+                                           rng.nextBoolean(),
+                                           rng.nextBoolean());
                 }
 
-                if (random.nextFloat() > 0.995f)
-                {
-                    history.visitPartition(partitionIndex).deleteColumns();
-                }
+                if (rng.nextFloat() > 0.995f)
+                    HistoryBuilderHelper.deleteRandomColumns(schema, 
partitionIndex, ckGen.generate(rng), rng, history);
 
-                if (random.nextFloat() > 0.9995f)
-                {
-                    history.visitPartition(partitionIndex).deletePartition();
-                }
+                if (rng.nextFloat() > 0.9995f)
+                    history.deletePartition(partitionIndex);
 
                 if (i % REPAIR_SKIP == 0)
-                {
-                    logger.debug("Repairing/flushing after operation {}...", 
i);
-                    repair(schema);
-                }
+                    history.custom(() -> repair(schema), "Repair");
                 else if (i % FLUSH_SKIP == 0)
-                {
-                    logger.debug("Flushing after operation {}...", i);
-                    flush(schema);
-                }
+                    history.custom(() -> flush(schema), "Flush");
+                else if (i % COMPACTION_SKIP == 0)
+                    history.custom(() -> flush(schema), "Flush");

Review Comment:
   Yes, thank you for spotting! 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to