ifesdjeen commented on code in PR #3656:
URL: https://github.com/apache/cassandra/pull/3656#discussion_r1829809277


##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -568,6 +570,46 @@ public static List<ClusterMetadata> tcmLoadRange(long min, 
long max)
         return afterLoad;
     }
 
+    /**
+     * This method exists due to the fact that we define a retry policy for 
TCM to follow, and then TCM ignores it and does no retries...
+     */
+    private static List<ClusterMetadata> reconstruct(long min, long max)
+    {
+        Epoch start = Epoch.create(min);
+        Epoch end = Epoch.create(max);
+        Retry.Deadline retryPolicyThatGetsIgnored = 
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),

Review Comment:
   I think we need to make `AbstractLocalProcessor#getLogState` (currently as 
passthrough) retry. Or have two each implementation retry individually.



##########
test/distributed/org/apache/cassandra/distributed/impl/Instance.java:
##########
@@ -651,7 +666,28 @@ public void startup(ICluster cluster)
                     throw (RuntimeException) t;
                 throw new RuntimeException(t);
             }
-        }).run();
+        }).call();
+        DurationSpec timeout = startupTimeout();
+        if (timeout == null)
+        {
+            waitOn(result);
+        }
+        else
+        {
+            try
+            {
+                result.get(timeout.quantity(), timeout.unit());
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();

Review Comment:
   Could you clarify? I thought if we catch interrupted exception, thread is 
already interrupted?



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -568,6 +570,46 @@ public static List<ClusterMetadata> tcmLoadRange(long min, 
long max)
         return afterLoad;
     }
 
+    /**
+     * This method exists due to the fact that we define a retry policy for 
TCM to follow, and then TCM ignores it and does no retries...
+     */
+    private static List<ClusterMetadata> reconstruct(long min, long max)
+    {
+        Epoch start = Epoch.create(min);
+        Epoch end = Epoch.create(max);
+        Retry.Deadline retryPolicyThatGetsIgnored = 
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
+                                                                      
TCMMetrics.instance.fetchLogRetries);
+        Throwable lastError = null;
+        Backoff backoff = new Backoff.ExponentialBackoff(42, 200, 
SECONDS.toMillis(1), ThreadLocalRandom.current()::nextDouble);

Review Comment:
   There's already a `Retry#Backoff` class that handles backoff that is used in 
conjunction with TCM retry functions. 
   
   In the retrospect, I should have pushed back on addition of second backoff 
in CASSANDRA-19856. I asked for a follow-up patch, but it never happened, and 
now we have two diverging concepts. 



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -1222,15 +1264,27 @@ public void tryMarkRemoved(Topology topology, Id target)
         if (node.commandStores().count() == 0) return; // when starting up 
stores can be empty, so ignore
         Ranges ranges = topology.rangesForNode(target);
         if (ranges.isEmpty()) return;
-        tryMarkRemoved(ranges, 0).begin(node().agent());
+        long startNanos = Clock.Global.nanoTime();
+        exclusiveSyncPointWithRetries(ranges, 0)

Review Comment:
   Is it OK for this to be async? Probably it is, and it always was, just 
making sure.



##########
test/distributed/org/apache/cassandra/fuzz/topology/HarryOnAccordTopologyMixupTest.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import accord.utils.Gen;
+import accord.utils.Invariants;
+import accord.utils.Property;
+import accord.utils.RandomSource;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import 
org.apache.cassandra.fuzz.topology.AccordTopologyMixupTest.ListenerHolder;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+
+public class HarryOnAccordTopologyMixupTest extends HarryTopologyMixupTest
+{
+    static
+    {
+        
CassandraRelevantProperties.ACCORD_AGENT_CLASS.setString(AccordTopologyMixupTest.InterceptAgent.class.getName());
+        // enable most expensive debugging checks
+        
CassandraRelevantProperties.ACCORD_KEY_PARANOIA_CPU.setString(Invariants.Paranoia.QUADRATIC.name());
+        
CassandraRelevantProperties.ACCORD_KEY_PARANOIA_MEMORY.setString(Invariants.Paranoia.QUADRATIC.name());
+        
CassandraRelevantProperties.ACCORD_KEY_PARANOIA_COSTFACTOR.setString(Invariants.ParanoiaCostFactor.HIGH.name());
+    }
+
+    @Override
+    protected void preCheck(Property.StatefulBuilder builder)
+    {
+        // if a failing seed is detected, populate here
+        // Example: builder.withSeed(42L);
+//        builder.withSeed(3447535624245025622L);

Review Comment:
   Should we at least for now force some seed, since we know there are quite a 
few problems in Accord, and come back and explore more seeds later?



##########
test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java:
##########
@@ -237,6 +237,25 @@ public static List<Node> peerStateToNodes(Object[][] 
resultset)
         return nodes;
     }
 
+    private static <T> T get(Object[] row, int idx, String name)
+    {
+        T t = (T) row[idx];
+        if (t == null || ((t instanceof Collection<?>) && ((Collection<?>) 
t).isEmpty()))
+            throw new IncompletePeersStateException(name);
+        return t;
+    }
+
+    /**
+     * When the node sees the new epoch, the update of the peers table is 
async, so checking the table may yield partial results, so need some way to 
detect this to enable retries.

Review Comment:
   We should make a note of this, and maybe never rely on peers, and instead 
construct real view of ring every time. 



##########
test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java:
##########
@@ -66,65 +107,130 @@ protected void destroyState(State<Spec> state, @Nullable 
Throwable cause)
         }
     }
 
-    private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
+    private static BiFunction<RandomSource, Cluster, Spec> 
createSchemaSpec(AccordMode mode)
     {
-        ReplayingHistoryBuilder harry = HarryHelper.dataGen(rs.nextLong(),
-                                                            new 
InJvmSut(cluster),
-                                                            new 
TokenPlacementModel.SimpleReplicationFactor(3),
-                                                            
SystemUnderTest.ConsistencyLevel.QUORUM);
-        cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};", 
HarryHelper.KEYSPACE));
-        var schema = harry.schema();
-        cluster.schemaChange(schema.compile().cql());
-        waitForCMSToQuiesce(cluster, cluster.get(1));
-        return new Spec(harry);
+        return (rs, cluster) -> {
+            long seed = rs.nextLong();
+            var schema = HarryHelper.schemaSpecBuilder("harry", 
"tbl").surjection().inflate(seed);
+            if (mode.kind != AccordMode.Kind.None)
+                schema = schema.withTransactionMode(mode.passthroughMode);
+            ReplayingHistoryBuilder harry = HarryHelper.dataGen(seed,
+                    mode.kind == AccordMode.Kind.Direct ? new 
AccordSut(cluster) : new InJvmSut(cluster),
+                    new TokenPlacementModel.SimpleReplicationFactor(3),
+                    SystemUnderTest.ConsistencyLevel.QUORUM,
+                    schema);
+            cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};", 
HarryHelper.KEYSPACE));
+            cluster.schemaChange(schema.compile().cql());
+            waitForCMSToQuiesce(cluster, cluster.get(1));
+            return new Spec(harry, mode);
+        };
     }
 
-    private static CommandGen<Spec> cqlOperations(Spec spec)
+    private static class HarryCommand extends SimpleCommand<State<Spec>>
     {
-        class HarryCommand extends SimpleCommand<State<Spec>>
+        HarryCommand(Function<State<Spec>, String> name, Consumer<State<Spec>> 
fn)
         {
-            HarryCommand(Function<State<Spec>, String> name, 
Consumer<State<Spec>> fn)
-            {
-                super(name, fn);
-            }
+            super(name, fn);
+        }
 
-            @Override
-            public PreCheckResult checkPreconditions(State<Spec> state)
-            {
-                int clusterSize = state.topologyHistory.up().length;
-                return clusterSize >= 3 ? PreCheckResult.Ok : 
PreCheckResult.Ignore;
-            }
+        @Override
+        public PreCheckResult checkPreconditions(State<Spec> state)
+        {
+            int clusterSize = state.topologyHistory.up().length;
+            return clusterSize >= 3 ? PreCheckResult.Ok : 
PreCheckResult.Ignore;
         }
+    }
+
+    private static CommandGen<Spec> cqlOperations(Spec spec)
+    {
         Command<State<Spec>, Void, ?> insert = new HarryCommand(state -> 
"Harry Insert" + state.commandNamePostfix(), state -> {
             spec.harry.insert();
             ((HarryState) state).numInserts++;
         });
-        Command<State<Spec>, Void, ?> validateAll = new HarryCommand(state -> 
"Harry Validate All" + state.commandNamePostfix(), state -> {
-            spec.harry.validateAll(spec.harry.quiescentChecker());
-            ((HarryState) state).numInserts = 0;
-        });
         return (rs, state) -> {
             HarryState harryState = (HarryState) state;
             TopologyHistory history = state.topologyHistory;
             // if any topology change happened, then always validate all
             if (harryState.generation != history.generation())
             {
                 harryState.generation = history.generation();
-                return validateAll;
+                return validateAll(state);
             }
             if ((harryState.numInserts > 0 && rs.decide(0.2))) // 20% of the 
time do reads
-                return validateAll;
+                return validateAll(state);
             return insert;
         };
     }
 
+    private static Command<State<Spec>, Void, ?> validateAll(State<Spec> state)
+    {
+        Spec spec = state.schemaSpec;
+        var schema = spec.harry.schema();
+        boolean writeThroughAccord = schema.isWriteTimeFromAccord();
+        List<Command<State<Spec>, Void, ?>> reads = new ArrayList<>();
+        Model model = spec.harry.quiescentChecker();
+        for (Long pd : new TreeSet<>(spec.harry.pds()))
+        {
+            reads.add(new HarryCommand(s -> "Harry Validate pd=" + pd  + 
state.commandNamePostfix(), s -> model.validate(Query.selectAllColumns(schema, 
pd, false))));
+            // as of this writing Accord does not support ORDER BY
+            if (!writeThroughAccord)
+                reads.add(new HarryCommand(s -> "Harry Reverse Validate pd=" + 
pd + state.commandNamePostfix(), s -> 
model.validate(Query.selectAllColumns(schema, pd, true))));
+        }
+//        if (reads.isEmpty())

Review Comment:
   nit: clean up commented code?



##########
src/java/org/apache/cassandra/tcm/log/LogReader.java:
##########
@@ -148,7 +148,8 @@ else if (includeSnapshot)
             else if (closestSnapshot.epoch.isBefore(start))
             {
                 ImmutableList.Builder<Entry> entries = new 
ImmutableList.Builder<>();
-                EntryHolder entryHolder = 
getEntries(closestSnapshot.epoch.nextEpoch(), end);
+                // start is exclusive, so use the closest snapshot
+                EntryHolder entryHolder = getEntries(closestSnapshot.epoch, 
end);

Review Comment:
   Oh, I have missed this one looks like. Wondering if all the tests are still 
passing.



##########
src/java/org/apache/cassandra/tcm/ClusterMetadataService.java:
##########
@@ -820,6 +820,11 @@ public static class SwitchableProcessor implements 
Processor
             this.cmsStateSupplier = cmsStateSupplier;
         }
 
+        public RemoteProcessor remoteProcessor()

Review Comment:
   Sounds good, we will need to pull in Sam's patch before commit, since this 
is inherently unsafe.



##########
test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java:
##########
@@ -66,65 +107,130 @@ protected void destroyState(State<Spec> state, @Nullable 
Throwable cause)
         }
     }
 
-    private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
+    private static BiFunction<RandomSource, Cluster, Spec> 
createSchemaSpec(AccordMode mode)
     {
-        ReplayingHistoryBuilder harry = HarryHelper.dataGen(rs.nextLong(),
-                                                            new 
InJvmSut(cluster),
-                                                            new 
TokenPlacementModel.SimpleReplicationFactor(3),
-                                                            
SystemUnderTest.ConsistencyLevel.QUORUM);
-        cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};", 
HarryHelper.KEYSPACE));
-        var schema = harry.schema();
-        cluster.schemaChange(schema.compile().cql());
-        waitForCMSToQuiesce(cluster, cluster.get(1));
-        return new Spec(harry);
+        return (rs, cluster) -> {
+            long seed = rs.nextLong();
+            var schema = HarryHelper.schemaSpecBuilder("harry", 
"tbl").surjection().inflate(seed);
+            if (mode.kind != AccordMode.Kind.None)
+                schema = schema.withTransactionMode(mode.passthroughMode);
+            ReplayingHistoryBuilder harry = HarryHelper.dataGen(seed,
+                    mode.kind == AccordMode.Kind.Direct ? new 
AccordSut(cluster) : new InJvmSut(cluster),
+                    new TokenPlacementModel.SimpleReplicationFactor(3),
+                    SystemUnderTest.ConsistencyLevel.QUORUM,
+                    schema);
+            cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};", 
HarryHelper.KEYSPACE));
+            cluster.schemaChange(schema.compile().cql());
+            waitForCMSToQuiesce(cluster, cluster.get(1));
+            return new Spec(harry, mode);
+        };
     }
 
-    private static CommandGen<Spec> cqlOperations(Spec spec)
+    private static class HarryCommand extends SimpleCommand<State<Spec>>
     {
-        class HarryCommand extends SimpleCommand<State<Spec>>
+        HarryCommand(Function<State<Spec>, String> name, Consumer<State<Spec>> 
fn)
         {
-            HarryCommand(Function<State<Spec>, String> name, 
Consumer<State<Spec>> fn)
-            {
-                super(name, fn);
-            }
+            super(name, fn);
+        }
 
-            @Override
-            public PreCheckResult checkPreconditions(State<Spec> state)
-            {
-                int clusterSize = state.topologyHistory.up().length;
-                return clusterSize >= 3 ? PreCheckResult.Ok : 
PreCheckResult.Ignore;
-            }
+        @Override
+        public PreCheckResult checkPreconditions(State<Spec> state)
+        {
+            int clusterSize = state.topologyHistory.up().length;
+            return clusterSize >= 3 ? PreCheckResult.Ok : 
PreCheckResult.Ignore;
         }
+    }
+
+    private static CommandGen<Spec> cqlOperations(Spec spec)
+    {
         Command<State<Spec>, Void, ?> insert = new HarryCommand(state -> 
"Harry Insert" + state.commandNamePostfix(), state -> {
             spec.harry.insert();
             ((HarryState) state).numInserts++;
         });
-        Command<State<Spec>, Void, ?> validateAll = new HarryCommand(state -> 
"Harry Validate All" + state.commandNamePostfix(), state -> {
-            spec.harry.validateAll(spec.harry.quiescentChecker());
-            ((HarryState) state).numInserts = 0;
-        });
         return (rs, state) -> {
             HarryState harryState = (HarryState) state;
             TopologyHistory history = state.topologyHistory;
             // if any topology change happened, then always validate all
             if (harryState.generation != history.generation())
             {
                 harryState.generation = history.generation();
-                return validateAll;
+                return validateAll(state);
             }
             if ((harryState.numInserts > 0 && rs.decide(0.2))) // 20% of the 
time do reads
-                return validateAll;
+                return validateAll(state);
             return insert;
         };
     }
 
+    private static Command<State<Spec>, Void, ?> validateAll(State<Spec> state)
+    {
+        Spec spec = state.schemaSpec;
+        var schema = spec.harry.schema();
+        boolean writeThroughAccord = schema.isWriteTimeFromAccord();
+        List<Command<State<Spec>, Void, ?>> reads = new ArrayList<>();
+        Model model = spec.harry.quiescentChecker();
+        for (Long pd : new TreeSet<>(spec.harry.pds()))
+        {
+            reads.add(new HarryCommand(s -> "Harry Validate pd=" + pd  + 
state.commandNamePostfix(), s -> model.validate(Query.selectAllColumns(schema, 
pd, false))));
+            // as of this writing Accord does not support ORDER BY
+            if (!writeThroughAccord)
+                reads.add(new HarryCommand(s -> "Harry Reverse Validate pd=" + 
pd + state.commandNamePostfix(), s -> 
model.validate(Query.selectAllColumns(schema, pd, true))));
+        }
+//        if (reads.isEmpty())
+//            throw new IllegalStateException("Attempted to read when no 
partitions have been written to");
+        reads.add(new HarryCommand(s -> "Reset Harry Write State" + 
state.commandNamePostfix(), s -> ((HarryState) s).numInserts = 0));
+        return Property.multistep(reads);
+    }
+
+    private static class AccordSut extends InJvmSut
+    {
+        private AccordSut(Cluster cluster)
+        {
+            super(cluster, roundRobin(cluster), retryOnTimeout(), 10, 3);
+        }
+
+        @Override
+        public Object[][] execute(String statement, ConsistencyLevel cl, int 
coordinator, int pageSize, Object... bindings)
+        {
+            return super.execute(wrapInTxn(statement), cl, coordinator, 
pageSize, bindings);
+        }
+
+        @Override
+        protected void onException(Throwable t)
+        {
+            t = Throwables.getRootCause(t);
+            if (!TIMEOUT_CHECKER.matches(t)) return;
+
+            TxnId id;
+            try
+            {
+                id = TxnId.parse(t.getMessage());

Review Comment:
   Could you elaborate this one? We try to parse out txnId, and if it matches, 
we swallow exception?



##########
test/unit/org/apache/cassandra/tcm/log/LogStateTestBase.java:
##########
@@ -244,6 +291,47 @@ public void 
sinceArbitraryEpochWithMultipleCorruptSnapshots()
         assertEntries(state.entries, since.nextEpoch(), CURRENT_EPOCH);
     }
 
+    @Test
+    public void getLogStateBetween()
+    {
+        qt().forAll(SNAPSHOTS_GEN, BETWEEN_GEN).check((snapshots, between) -> {
+            LogStateSUT sut = getSystemUnderTest(snapshots);
+            LogState state = sut.reader().getLogState(between.start, 
between.end, true);
+            Assertions.assertThat(state.entries).describedAs("with and without 
snapshot should have the same 
entries").isEqualTo(sut.reader().getLogState(between.start, between.end, 
false).entries);
+            
Assertions.assertThat(state.baseState.epoch).isEqualTo(between.start);
+
+            List<Entry> entries = state.entries;
+            
Assertions.assertThat(entries.size()).isEqualTo(between.end.getEpoch() - 
between.start.getEpoch());
+
+            long expected = between.start.nextEpoch().getEpoch();
+            for (Entry e : entries)
+            {
+                long actual = e.epoch.getEpoch();
+                Assertions.assertThat(actual).describedAs("Unexpected 
epoch").isEqualTo(expected);
+                expected++;
+            }
+        });
+    }
+
+    @Test
+    public void getEntriesBetween()
+    {
+        qt().forAll(SNAPSHOTS_GEN, BETWEEN_GEN).check((snapshots, between) -> {
+            LogStateSUT sut = getSystemUnderTest(snapshots);
+            LogReader.EntryHolder entries = 
sut.reader().getEntries(between.start, between.end);
+            Assertions.assertThat(entries.since).isEqualTo(between.start);
+            
Assertions.assertThat(entries.entries.size()).isEqualTo(between.end.getEpoch() 
- between.start.getEpoch());
+
+            long expected = between.start.nextEpoch().getEpoch();
+            for (Entry e : entries.entries)
+            {
+                long actual = e.epoch.getEpoch();
+                Assertions.assertThat(actual).describedAs("Unexpected 
epoch").isEqualTo(expected);
+                expected++;

Review Comment:
   nit: Should we use `nextEpoch` instead of ints here and above? 



##########
test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java:
##########
@@ -659,9 +696,12 @@ public static void 
waitForCMSToQuiesce(ICluster<IInvokableInstance> cluster, Epo
                 if (skip)
                     continue;
 
-                if (cluster.get(j).isShutdown())
+                IInvokableInstance inst = cluster.get(j);
+                if (inst.isShutdown())
                     continue;
-                Epoch version = getClusterMetadataVersion(cluster.get(j));
+                Epoch version = getClusterMetadataVersion(inst);
+                if (fetchLogWhenBehind && version.getEpoch() < 
awaitedEpoch.getEpoch())
+                    version = fetchLogFromCMS(inst, awaitedEpoch);

Review Comment:
   +1 as long as this is off by default 



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -568,6 +570,46 @@ public static List<ClusterMetadata> tcmLoadRange(long min, 
long max)
         return afterLoad;
     }
 
+    /**
+     * This method exists due to the fact that we define a retry policy for 
TCM to follow, and then TCM ignores it and does no retries...
+     */
+    private static List<ClusterMetadata> reconstruct(long min, long max)
+    {
+        Epoch start = Epoch.create(min);
+        Epoch end = Epoch.create(max);
+        Retry.Deadline retryPolicyThatGetsIgnored = 
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
+                                                                      
TCMMetrics.instance.fetchLogRetries);
+        Throwable lastError = null;
+        Backoff backoff = new Backoff.ExponentialBackoff(42, 200, 
SECONDS.toMillis(1), ThreadLocalRandom.current()::nextDouble);
+        long startNanos = Clock.Global.nanoTime();
+        for (int i = 0; backoff.mayRetry(i); i++)
+        {
+            try
+            {
+                Processor processor = 
ClusterMetadataService.instance().processor();
+                // When starting up paxos based processor has shown to be 
flakey (this node is still starting up), so attempt to leverage the remote 
process to overload this work
+                if (processor instanceof 
ClusterMetadataService.SwitchableProcessor)
+                    processor = ((ClusterMetadataService.SwitchableProcessor) 
processor).remoteProcessor();

Review Comment:
   We should never dereference remote processor. It is inherently unsafe. I 
think we should investigate the FD issue. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to