dcapwell commented on code in PR #3486:
URL: https://github.com/apache/cassandra/pull/3486#discussion_r1733359601


##########
test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java:
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.Commands;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.Property.StateOnlyCommand;
+import accord.utils.RandomSource;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.utils.ConfigGenBuilder;
+
+import static accord.utils.Property.multistep;
+import static accord.utils.Property.stateful;
+
+/**
+ * These tests can create many instances, so mac users may need to run the 
following to avoid address bind failures
+ *
+ * {@code for id in $(seq 0 15); do sudo ifconfig lo0 alias "127.0.0.$id"; 
done;}
+ */
+public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.SchemaSpec> extends TestBaseImpl
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TopologyMixupTestBase.class);
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    private enum TopologyChange
+    {
+        AddNode,
+        RemoveNode,
+        HostReplace,
+        //TODO (coverage): add the following states once supported
+//        StopNode,
+//        StartNode,
+//        MoveToken
+        //TODO (coverage): node migrate to another rack or dc (unsupported on 
trunk as of this writing, but planned work for TCM)
+//        MoveNodeToNewRack,
+//        MoveNodeToNewDC,
+    }
+
+    private enum RemoveType
+    {Decommission, RemoveNode, Assassinate}
+
+    private static final Gen.IntGen MURMUR_TOKEN_GEN = rs -> 
rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE);
+    private static final int TARGET_RF = 3;
+    private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION = 
Gens.enums().allMixedDistribution(RemoveType.class);
+    private static final Gen<Map<String, Object>> CONF_GEN = new 
ConfigGenBuilder()
+                                                             // jvm-dtest hard 
codes this partitioner in its APIs, so overriding will break the test
+                                                             
.withPartitionerGen(null)
+                                                             .build();
+
+    // common commands
+    private Command<State<S>, Void, ?> repairCommand(State<S> state, int 
toCoordinate)
+    {
+        return new SimpleCommand<>("nodetool repair " + 
state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node" 
+ toCoordinate + state.commandNamePostfix(),
+                                   s2 -> 
s2.cluster.get(toCoordinate).nodetoolResult("repair", 
state.schemaSpec.keyspaceName(), s2.schemaSpec.name()).asserts().success());
+    }
+
+    private Command<State<S>, Void, ?> waitForCMSToQuiesce()
+    {
+        return new StateOnlyCommand<>()
+        {
+            @Override
+            public String detailed(State<S> state)
+            {
+                return "Waiting for CMS to Quiesce" + 
state.commandNamePostfix();
+            }
+
+            @Override
+            public void applyUnit(State<S> state)
+            {
+                ClusterUtils.waitForCMSToQuiesce(state.cluster, 
state.cmsGroup);
+            }
+        };
+    }
+
+    private Command<State<S>, Void, ?> stopInstance(State<S> state, int 
toRemove)
+    {
+        return new SimpleCommand<>("Stop Node" + toRemove + " for Assassinate" 
+ state.commandNamePostfix(), s2 -> {
+            IInvokableInstance inst = s2.cluster.get(toRemove);
+            TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+            ClusterUtils.stopUnchecked(inst);
+            node.down();
+        });
+    }
+
+    private Command<State<S>, Void, ?> addNode(State<S> state)
+    {
+        int nodeId = state.topologyHistory.uniqueInstances + 1;
+        return new SimpleCommand<>("Add Node" + nodeId + 
state.commandNamePostfix(),
+                                   s2 -> {
+                                       TopologyHistory.Node n = 
s2.topologyHistory.addNode();
+                                       IInvokableInstance newInstance = 
ClusterUtils.addInstance(s2.cluster, n.dc, n.rack, c -> c.set("auto_bootstrap", 
true));
+                                       newInstance.startup(s2.cluster);
+                                       n.up();
+                                   });
+    }
+
+    private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs, 
State<S> state)
+    {
+        int toRemove = rs.pickInt(state.topologyHistory.up());
+        return new SimpleCommand<>("nodetool decommission node" + toRemove + 
state.commandNamePostfix(), s2 -> {
+            IInvokableInstance inst = s2.cluster.get(toRemove);
+            TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+            node.status = TopologyHistory.Node.Status.BeingDecommissioned;
+            inst.nodetoolResult("decommission").asserts().success();
+            ClusterUtils.stopUnchecked(inst);
+            node.removed();
+        });
+    }
+
+    private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S> 
state)
+    {
+        int[] up = state.topologyHistory.up();
+        int toRemove = rs.pickInt(up);
+        int toCoordinate;
+        {
+            int picked;
+            do
+            {
+                picked = rs.pickInt(up);
+            }
+            while (picked == toRemove);
+            toCoordinate = picked;
+        }
+        return multistep(stopInstance(state, toRemove),
+                         new SimpleCommand<>("nodetool removenode node" + 
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+                             TopologyHistory.Node node = 
s2.topologyHistory.node(toRemove);
+                             node.status = 
TopologyHistory.Node.Status.BeingRemoved;
+                             IInvokableInstance coordinator = 
s2.cluster.get(toCoordinate);
+                             coordinator.nodetoolResult("removenode", 
Integer.toString(toRemove), "--force").asserts().success();
+                             node.removed();
+                             
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+                         }),
+                         repairCommand(state, toCoordinate));
+    }
+
+    private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs, 
State<S> state)
+    {
+        //TODO (correctness): assassinate CMS member isn't allowed
+        IntHashSet up = asSet(state.topologyHistory.up());
+        IntHashSet cmsGroup = asSet(state.cmsGroup);
+        Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
+        if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a 
CMS member");
+        List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
+        allowed.sort(Comparator.naturalOrder());
+        int toRemove = rs.pick(allowed);
+        int toCoordinate;
+        {
+            int[] upInt = state.topologyHistory.up();
+            int picked;
+            do
+            {
+                picked = rs.pickInt(upInt);
+            }
+            while (picked == toRemove);
+            toCoordinate = picked;
+        }
+        return multistep(stopInstance(state, toRemove),
+                         new SimpleCommand<>("nodetool assassinate node" + 
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+                             TopologyHistory.Node node = 
s2.topologyHistory.node(toRemove);
+                             node.status = 
TopologyHistory.Node.Status.BeingAssassinated;
+                             IInvokableInstance coordinator = 
s2.cluster.get(toCoordinate);
+                             InetSocketAddress address = 
s2.cluster.get(toRemove).config().broadcastAddress();
+                             coordinator.nodetoolResult("assassinate", 
address.getAddress().getHostAddress() + ":" + 
address.getPort()).asserts().success();
+                             node.removed();
+                             
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+                         }),
+                         repairCommand(state, toCoordinate)
+        );
+    }
+
+    private Command<State<S>, Void, ?> 
removeNodeRandomizedDispatch(RandomSource rs, State<S> state)
+    {
+        RemoveType type = state.removeTypeGen.next(rs);
+        switch (type)
+        {
+            case Decommission:
+                return removeNodeDecommission(rs, state);
+            case RemoveNode:
+                return removeNode(rs, state);
+            case Assassinate:
+                return removeNodeAssassinate(rs, state);
+            default:
+                throw new UnsupportedOperationException("Unknown remove type: 
" + type);
+        }
+    }
+
+    private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S> 
state)
+    {
+        int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+        IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
+        TopologyHistory.Node adding = 
state.topologyHistory.replace(nodeToReplace);
+        TopologyHistory.Node removing = 
state.topologyHistory.nodes.get(nodeToReplace);
+
+        return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + " 
for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+                             ClusterUtils.stopUnchecked(toReplace);
+                             removing.down();
+                         }),
+                         new SimpleCommand<>("Host Replace Node" + 
nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+                             logger.info("node{} starting host replacement; 
epoch={}", adding.id, 
HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance()));
+                             removing.status = 
TopologyHistory.Node.Status.BeingReplaced;
+                             IInvokableInstance inst = 
ClusterUtils.replaceHostAndStart(s2.cluster, toReplace);
+                             s2.topologyHistory.replaced(removing, adding);
+                             long epoch = HackSerialization.tcmEpoch(inst);
+                             s2.currentEpoch.set(epoch);
+                             logger.info("{} completed host replacement in 
epoch={}", inst, epoch);
+                         }),
+                         //TODO (remove after rebase to trunk): 
https://issues.apache.org/jira/browse/CASSANDRA-19705  After the rebase to 
trunk this is not needed.  The issue is that the CMS placement removes the 
node, it does not promote another node, this cases rf=3 to become rf=2
+                         new SimpleCommand<>("CMS reconfigure on Node" + 
adding.id + state.commandNamePostfix(), s2 -> 
s2.cluster.get(adding.id).nodetoolResult("cms", "reconfigure", 
Integer.toString(TARGET_RF)).asserts().success())
+        );
+    }
+
+    protected abstract Gen<State<S>> stateGen();
+
+    protected void preCheck(Property.StatefulBuilder statefulBuilder)
+    {
+
+    }
+
+    protected void destroyState(State<S> state, @Nullable Throwable cause) 
throws Throwable
+    {
+
+    }
+
+    @Test
+    public void test()
+    {
+        Property.StatefulBuilder statefulBuilder = 
stateful().withSteps(50).withStepTimeout(Duration.ofMinutes(2)).withExamples(1);
+        preCheck(statefulBuilder);
+        statefulBuilder.check(new Commands<State<S>, Void>()
+        {
+            @Override
+            public Gen<State<S>> genInitialState()
+            {
+                return stateGen();
+            }
+
+            @Override
+            public Void createSut(State<S> state)
+            {
+                return null;
+            }
+
+            @Override
+            public Gen<Command<State<S>, Void, ?>> commands(State<S> state)
+            {
+                state.preActions.forEach(Runnable::run);
+
+                Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new 
LinkedHashMap<>();
+
+                // topology change
+                EnumSet<TopologyChange> possibleTopologyChanges = 
possibleTopologyChanges(state);
+                if (!possibleTopologyChanges.isEmpty())
+                    possible.put(topologyCommand(state, 
possibleTopologyChanges), 2);
+                possible.put(rs -> repairCommand(state, 
rs.pickInt(state.topologyHistory.up())), 1);
+                possible.put(rs -> state.statementGen.apply(rs, state), 7);
+
+                return Gens.oneOf(possible);
+            }
+
+            private EnumSet<TopologyChange> possibleTopologyChanges(State<S> 
state)
+            {
+                EnumSet<TopologyChange> possibleTopologyChanges = 
EnumSet.noneOf(TopologyChange.class);
+                // up or down is logically more correct, but since this runs 
sequentially and after the topology changes are complete, we don't have downed 
nodes at this point
+                // so up is enough to know the topology size
+                int size = state.topologyHistory.up().length;
+                if (size < state.topologyHistory.maxNodes)
+                    possibleTopologyChanges.add(TopologyChange.AddNode);
+                if (size > state.topologyHistory.quorum())
+                {
+                    if (size > TARGET_RF)
+                        possibleTopologyChanges.add(TopologyChange.RemoveNode);
+                    possibleTopologyChanges.add(TopologyChange.HostReplace);
+                }
+                return possibleTopologyChanges;
+            }
+
+            private Gen<Command<State<S>, Void, ?>> topologyCommand(State<S> 
state, EnumSet<TopologyChange> possibleTopologyChanges)
+            {
+                Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new 
LinkedHashMap<>();
+                for (TopologyChange task : possibleTopologyChanges)
+                {
+                    switch (task)
+                    {
+                        case AddNode:
+                            possible.put(ignore -> multistep(addNode(state), 
waitForCMSToQuiesce()), 1);
+                            break;
+                        case RemoveNode:
+                            possible.put(rs -> 
multistep(removeNodeRandomizedDispatch(rs, state), waitForCMSToQuiesce()), 1);
+                            break;
+                        case HostReplace:
+                            possible.put(rs -> multistep(hostReplace(rs, 
state), waitForCMSToQuiesce()), 1);
+                            break;
+                        default:
+                            throw new 
UnsupportedOperationException(task.name());
+                    }
+                }
+                return Gens.oneOf(possible);
+            }
+
+            @Override
+            public void destroyState(State<S> state, @Nullable Throwable 
cause) throws Throwable
+            {
+                if (cause != null)
+                {
+                    for (Runnable r : state.onError)
+                    {
+                        try
+                        {
+                            r.run();
+                        }
+                        catch (Throwable t)
+                        {
+                            cause.addSuppressed(t);
+                        }
+                    }
+                }
+                TopologyMixupTestBase.this.destroyState(state, cause);
+                state.close();
+            }
+        });
+    }
+
+    private static IntHashSet asSet(int[] array)
+    {
+        IntHashSet set = new IntHashSet(array.length);
+        for (int i : array)
+            set.add(i);
+        return set;
+    }
+
+    public interface SchemaSpec
+    {
+        String name();
+
+        String keyspaceName();
+    }
+
+    protected static class State<S extends SchemaSpec> implements AutoCloseable
+    {
+        final TopologyHistory topologyHistory;
+        final Cluster cluster;
+        final S schemaSpec;
+        final List<Runnable> preActions = new CopyOnWriteArrayList<>();
+        final List<Runnable> onError = new CopyOnWriteArrayList<>();
+        final AtomicLong currentEpoch = new AtomicLong();
+        final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>> 
statementGen;
+        final Gen<RemoveType> removeTypeGen;
+        private final Map<String, Object> yamlConfigOverrides;
+        int[] cmsGroup = new int[0];
+
+        public State(RandomSource rs, BiFunction<RandomSource, Cluster, S> 
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>, 
Void, ?>>> cqlOperationsGen)
+        {
+            this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+            try
+            {
+
+                this.yamlConfigOverrides = CONF_GEN.next(rs);
+                cluster = Cluster.build(topologyHistory.minNodes)
+                                 .withTokenSupplier(topologyHistory)
+                                 .withConfig(c -> {
+                                     c.with(Feature.values())
+                                      .set("write_request_timeout", "10s");
+                                     //TODO (mainatiance): where to put this?  
Anything touching ConfigGenBuilder with jvm-dtest needs this...
+                                     ((InstanceConfig) 
c).remove("commitlog_sync_period_in_ms");
+                                     for (Map.Entry<String, Object> e : 
yamlConfigOverrides.entrySet())
+                                         c.set(e.getKey(), e.getValue());
+                                     onConfigure(c);
+                                 })
+                                 //TODO (maintance): should TopologyHistory 
also be a INodeProvisionStrategy.Factory so address information is stored in 
the Node?
+                                 //TODO (maintance): AbstractCluster's 
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with 
dc/rack annoying, if this becomes an interface then TopologyHistory could own
+                                 .withNodeProvisionStrategy((subnet, portMap) 
-> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap)
+                                 {
+                                     {
+                                         Invariants.checkArgument(subnet == 0, 
"Unexpected subnet detected: %d", subnet);
+                                     }
+
+                                     private final String ipPrefix = "127.0." 
+ subnet + '.';
+
+                                     @Override
+                                     public int seedNodeNum()
+                                     {
+                                         int[] up = topologyHistory.up();
+                                         if (Arrays.equals(up, new int[]{ 1, 2 
}))
+                                             return 1;
+                                         return rs.pickInt(up);
+                                     }
+
+                                     @Override
+                                     public String ipAddress(int nodeNum)
+                                     {
+                                         return ipPrefix + nodeNum;
+                                     }
+                                 })
+                                 .start();
+            }
+            catch (IOException e)
+            {
+                throw new UncheckedIOException(e);
+            }
+            fixDistributedSchemas(cluster);
+            init(cluster, TARGET_RF);
+            // fix TCM
+            {
+                NodeToolResult result = cluster.get(1).nodetoolResult("cms", 
"reconfigure", "2");
+                result.asserts().success();
+                logger.info("CMS reconfigure: {}", result.getStdout());
+            }
+            preActions.add(new Runnable()
+            {
+                // in order to remove this action, an annonomus class is 
needed so "this" works, lambda "this" is the parent class

Review Comment:
   > if you like you can use preActions.remove(State.this);
   
   what do you mean?  if this becomes a lambda then `this` is `State.this` 
which is not the correct reference to remove... I would need to save to a 
variable to keep access *or* make it anonymous...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to