dcapwell commented on code in PR #3108:
URL: https://github.com/apache/cassandra/pull/3108#discussion_r1491427286


##########
test/unit/org/apache/cassandra/service/accord/AccordCommandStoreFuzzTest.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.function.ToLongFunction;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import accord.impl.SizeOfIntersectionSorter;
+import accord.impl.TestAgent;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandStores;
+import accord.local.Node;
+import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
+import accord.local.SaveStatus;
+import accord.messages.PreAccept;
+import accord.messages.TxnRequest;
+import accord.primitives.FullKeyRoute;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+import accord.utils.AccordGens;
+import accord.utils.RandomSource;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.SimulatedExecutorFactory;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.metrics.AccordStateCacheMetrics;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.transformations.AddAccordTable;
+import org.apache.cassandra.utils.Generators;
+import org.apache.cassandra.utils.Pair;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
+import static org.apache.cassandra.service.accord.AccordTestUtils.wrapInTxn;
+import static org.apache.cassandra.utils.AccordGenerators.fromQT;
+
+public class AccordCommandStoreFuzzTest extends CQLTester
+{
+    static
+    {
+        
CassandraRelevantProperties.TEST_ACCORD_STORE_THREAD_CHECKS_ENABLED.setBoolean(false);
+        // Restarts are not covered in these tests and the range logic is 
getting rewritten, so disable loading
+        
CassandraRelevantProperties.TEST_ACCORD_STORE_LOAD_RANGES_ENABLED.setBoolean(false);
+        // since this test does frequent truncates, the info table gets 
updated and forced flushed... which is 90% of the cost of this test...
+        // this flag disables that flush
+        CassandraRelevantProperties.UNSAFE_SYSTEM.setBoolean(true);
+    }
+
+    private static TableMetadata intTbl, reverseTokenTbl;
+    private static Node.Id nodeId;
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        CQLTester.setUpClass();
+        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+    }
+
+    @Before
+    public void init()
+    {
+        if (intTbl != null)
+            return;
+        createKeyspace("CREATE KEYSPACE test WITH replication={ 'class' : 
'SimpleStrategy', 'replication_factor' : 2 }");
+        createTable("test", "CREATE TABLE test.tbl1 (pk int PRIMARY KEY, value 
int)");
+        intTbl = Schema.instance.getTableMetadata("test", "tbl1");
+        AddAccordTable.addTable(intTbl.id);
+
+        createTable("test", "CREATE TABLE test.tbl2 (pk vector<bigint, 2> 
PRIMARY KEY, value int)");
+        reverseTokenTbl = Schema.instance.getTableMetadata("test", "tbl2");
+        AddAccordTable.addTable(reverseTokenTbl.id);
+
+        nodeId = 
AccordTopology.tcmIdToAccord(ClusterMetadata.current().myNodeId());
+
+        ServerTestUtils.markCMS();
+    }
+
+    @Test
+    public void emptyTxns()
+    {
+        qt().withExamples(10).check(rs -> {
+           clearSystemTables();
+            try (var instance = new Instance(rs))
+            {
+                for (int i = 0, examples = 100; i < examples; i++)
+                {
+                    TxnId id = AccordGens.txnIds().next(rs);
+                    instance.process(PreLoadContext.contextFor(id), (safe) -> {
+                        var safeCommand = safe.get(id, id, Ranges.EMPTY);
+                        var command = safeCommand.current();
+                        
Assertions.assertThat(command.saveStatus()).isEqualTo(SaveStatus.Uninitialised);
+                        return null;
+                    });
+                }
+            }
+
+        });
+    }
+
+    @Test
+    public void keyConflicts()
+    {
+        TableMetadata tbl = intTbl;
+        int numSamples = 100;
+
+        qt().withExamples(10).check(rs -> {
+            clearSystemTables();
+            int key = rs.nextInt();
+            PartitionKey pk = new PartitionKey(tbl.id, 
tbl.partitioner.decorateKey(Int32Type.instance.decompose(key)));
+            Keys keys = Keys.of(pk);
+            FullKeyRoute route = keys.toRoute(pk.toUnseekable());
+            Txn txn = createTxn(wrapInTxn("INSERT INTO " + tbl + "(pk, value) 
VALUES (?, ?)"), Arrays.asList(key, 42));
+            try (var instance = new Instance(rs))
+            {
+                List<TxnId> conflicts = new ArrayList<>(numSamples);
+                boolean concurrent = rs.nextBoolean();
+                List<AsyncChain<?>> asyncs = !concurrent ? null : new 
ArrayList<>(numSamples);
+                for (int i = 0; i < numSamples; i++)
+                {
+                    instance.maybeCacheEvict(keys, Ranges.EMPTY);
+                    if (concurrent)
+                    {
+                        var pair = assertPreAcceptAsync(instance, txn, route, 
conflicts, keys);
+                        conflicts.add(pair.left);
+                        asyncs.add(pair.right);
+                    }
+                    else
+                    {
+                        conflicts.add(assertPreAccept(instance, txn, route, 
conflicts, keys));
+                    }
+                }
+                if (concurrent)
+                {
+                    instance.processAll();
+                    for (var chain : asyncs)
+                        AsyncChains.getBlocking(chain);
+                }
+            }
+        });
+    }
+
+    private static TxnId assertPreAccept(Instance instance,
+                                         Txn txn, FullRoute<?> route,
+                                         List<TxnId> keyConflicts, Keys keys) 
throws ExecutionException, InterruptedException
+    {
+        return assertPreAccept(instance, txn, route, keyConflicts, keys, 
Collections.emptyList(), Ranges.EMPTY);
+    }
+
+    private static TxnId assertPreAccept(Instance instance,
+                                         Txn txn, FullRoute<?> route,
+                                         List<TxnId> keyConflicts, Keys keys,
+                                         List<TxnId> rangeConflicts, Ranges 
ranges) throws ExecutionException, InterruptedException
+    {
+        var pair = assertPreAcceptAsync(instance, txn, route, keyConflicts, 
keys, rangeConflicts, ranges);
+        instance.processAll();
+        AsyncChains.getBlocking(pair.right);
+
+        return pair.left;
+    }
+
+    private static Pair<TxnId, AsyncResult<?>> assertPreAcceptAsync(Instance 
instance,
+                                                                      Txn txn, 
FullRoute<?> route,
+                                                                      
List<TxnId> keyConflicts, Keys keys)
+    {
+        return assertPreAcceptAsync(instance, txn, route, keyConflicts, keys, 
Collections.emptyList(), Ranges.EMPTY);
+    }
+
+    private static Pair<TxnId, AsyncResult<?>> assertPreAcceptAsync(Instance 
instance,
+                                                                      Txn txn, 
FullRoute<?> route,
+                                                                      
List<TxnId> keyConflicts, Keys keys,
+                                                                      
List<TxnId> rangeConflicts, Ranges ranges)
+    {
+        int kConflictSize = keyConflicts.size();
+        int rConflictSize = rangeConflicts.size();
+        TxnId txnId = new TxnId(instance.timeService.epoch(), 
instance.timeService.now(), txn.kind(), txn.keys().domain(), nodeId);
+        var preAccept = new PreAccept(instance.nodeId, new 
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, instance.topology), txnId, 
txn, route);
+        return Pair.create(txnId, instance.processAsync(preAccept, safe -> {
+            var reply = preAccept.apply(safe);
+            Assertions.assertThat(reply.isOk()).isTrue();
+            return (PreAccept.PreAcceptOk) reply;
+        }).map(success -> {
+            assertDeps(keyConflicts.subList(0, kConflictSize), keys, 
rangeConflicts.subList(0, rConflictSize), ranges, success);
+            return null;
+        }).beginAsResult());
+    }
+
+    private static void assertDeps(List<TxnId> keyConflicts, Keys keys,
+                                   List<TxnId> rangeConflicts, Ranges ranges,
+                                   PreAccept.PreAcceptOk success)
+    {
+        if (rangeConflicts.isEmpty())
+        {
+            
Assertions.assertThat(success.deps.rangeDeps.isEmpty()).describedAs("rangeDeps 
was not empty").isTrue();
+        }
+        else
+        {
+            
Assertions.assertThat(success.deps.rangeDeps.rangeCount()).describedAs("Expected
 ranges size").isEqualTo(ranges.size());
+            for (int i = 0; i < ranges.size(); i++)
+            {
+                var expected = Ranges.of(ranges.get(i));
+                var actual = success.deps.rangeDeps.ranges(i);
+                Assertions.assertThat(actual).isEqualTo(expected);
+                var conflict = success.deps.rangeDeps.txnIdsForRangeIndex(i);
+                Assertions.assertThat(conflict).describedAs("Expected range %s 
to have different conflicting txns", expected).isEqualTo(rangeConflicts);
+            }
+        }
+        if (keyConflicts.isEmpty())
+        {
+            
Assertions.assertThat(success.deps.keyDeps.isEmpty()).describedAs("keyDeps was 
not empty").isTrue();
+        }
+        else
+        {
+            
Assertions.assertThat(success.deps.keyDeps.keys()).describedAs("Keys").isEqualTo(keys);
+            for (var key : keys)
+                
Assertions.assertThat(success.deps.keyDeps.txnIds(key)).describedAs("Txns for 
key %s", key).isEqualTo(keyConflicts);
+        }
+    }
+
+    private static void clearSystemTables()
+    {
+        for (var store : 
Keyspace.open(SchemaConstants.ACCORD_KEYSPACE_NAME).getColumnFamilyStores())
+            store.truncateBlockingWithoutSnapshot();
+    }
+
+    private static class Instance implements AutoCloseable
+    {
+        private final RandomSource rs;
+        private final List<Throwable> failures = new ArrayList<>();
+        private final SimulatedExecutorFactory globalExecutor;
+        private final ExecutorPlus orderedExecutor;
+        private final ScheduledExecutorPlus unorderedScheduled;
+        private final AccordCommandStore store;
+        private final CommandStore.EpochUpdateHolder updateHolder;
+        private final NodeTimeService timeService;
+        private final Node.Id nodeId;
+        private final Topology topology;
+        private final BooleanSupplier shouldEvict;
+
+        private Instance(RandomSource rs)
+        {
+            this.rs = rs;
+            globalExecutor = new SimulatedExecutorFactory(rs, 
fromQT(Generators.TIMESTAMP_GEN.map(java.sql.Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs),
 failures::add);
+            orderedExecutor = 
globalExecutor.configureSequential("ignore").build();
+            unorderedScheduled = globalExecutor.scheduled("ignored");
+            ExecutorFactory.Global.unsafeSet(globalExecutor);
+            Stage.READ.unsafeSetExecutor(unorderedScheduled);
+            Stage.MUTATION.unsafeSetExecutor(unorderedScheduled);
+
+            this.updateHolder = new CommandStore.EpochUpdateHolder();
+            this.nodeId = 
AccordTopology.tcmIdToAccord(ClusterMetadata.currentNullable().myNodeId());
+            this.timeService = new NodeTimeService()
+            {
+                private final ToLongFunction<TimeUnit> unixWrapper = 
NodeTimeService.unixWrapper(TimeUnit.NANOSECONDS, this::now);
+
+                @Override
+                public Node.Id id()
+                {
+                    return nodeId;
+                }
+
+                @Override
+                public long epoch()
+                {
+                    return ClusterMetadata.current().epoch.getEpoch();
+                }
+
+                @Override
+                public long now()
+                {
+                    return globalExecutor.nanoTime();
+                }
+
+                @Override
+                public long unix(TimeUnit unit)
+                {
+                    return unixWrapper.applyAsLong(unit);
+                }
+
+                @Override
+                public Timestamp uniqueNow(Timestamp atLeast)
+                {
+                    var now = Timestamp.fromValues(epoch(), now(), nodeId);
+                    if (now.compareTo(atLeast) < 0)
+                        throw new UnsupportedOperationException();
+                    return now;
+                }
+            };
+
+            this.store = new AccordCommandStore(0,
+                                                timeService,
+                                                new TestAgent.RethrowAgent(),

Review Comment:
   since we catch all exceptions in executors, Rethrowing is fine, don't need 
`errors` to propagate to the agent here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to