dcapwell commented on code in PR #3486: URL: https://github.com/apache/cassandra/pull/3486#discussion_r1725776756
########## test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java: ########## @@ -0,0 +1,761 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fuzz.topology; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +import com.google.common.collect.Sets; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.Invariants; +import accord.utils.Property; +import accord.utils.Property.Command; +import accord.utils.Property.Commands; +import accord.utils.Property.SimpleCommand; +import accord.utils.Property.StateOnlyCommand; +import accord.utils.RandomSource; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.IntArrayList; +import org.agrona.collections.IntHashSet; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.YamlConfigurationLoader; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy; +import org.apache.cassandra.distributed.impl.InstanceConfig; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.utils.ConfigGenBuilder; + +import static accord.utils.Property.multistep; +import static accord.utils.Property.stateful; + +public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.SchemaSpec> extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(TopologyMixupTestBase.class); + + static + { + DatabaseDescriptor.clientInitialization(); + } + + private enum TopologyChange + { + AddNode, + RemoveNode, + HostReplace, + //TODO (coverage): add the following states once supported +// StopNode, +// StartNode, +// MoveToken + //TODO (coverage): node migrate to another rack or dc (unsupported on trunk as of this writing, but planned work for TCM) +// MoveNodeToNewRack, +// MoveNodeToNewDC, + } + + private enum RemoveType + {Decommission, RemoveNode, Assassinate} + + private static final Gen.IntGen MURMUR_TOKEN_GEN = rs -> rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE); + private static final int TARGET_RF = 3; + private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION = Gens.enums().allMixedDistribution(RemoveType.class); + private static final Gen<Map<String, Object>> CONF_GEN = new ConfigGenBuilder() + // jvm-dtest hard codes this partitioner in its APIs, so overriding will break the test + .withPartitioner(Murmur3Partitioner.instance) + .build(); + + // common commands + private Command<State<S>, Void, ?> repairCommand(State<S> state, int toCoordinate) + { + return new SimpleCommand<>("nodetool repair " + state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node" + toCoordinate + state.commandNamePostfix(), + s2 -> s2.cluster.get(toCoordinate).nodetoolResult("repair", state.schemaSpec.keyspaceName(), s2.schemaSpec.name()).asserts().success()); + } + + private Command<State<S>, Void, ?> waitForCMSToQuiesce() + { + return new StateOnlyCommand<>() + { + @Override + public String detailed(State<S> state) + { + return "Waiting for CMS to Quiesce" + state.commandNamePostfix(); + } + + @Override + public void applyUnit(State<S> state) + { + ClusterUtils.waitForCMSToQuiesce(state.cluster, state.cmsGroup); + } + }; + } + + private Command<State<S>, Void, ?> stopInstance(State<S> state, int toRemove) + { + return new SimpleCommand<>("Stop Node" + toRemove + " for Assassinate" + state.commandNamePostfix(), s2 -> { + IInvokableInstance inst = s2.cluster.get(toRemove); + TopologyHistory.Node node = s2.topologyHistory.node(toRemove); + ClusterUtils.stopUnchecked(inst); + node.down(); + }); + } + + private Command<State<S>, Void, ?> addNode(State<S> state) + { + int nodeId = state.topologyHistory.uniqueInstances + 1; + return new SimpleCommand<>("Add Node" + nodeId + state.commandNamePostfix(), + s2 -> { + TopologyHistory.Node n = s2.topologyHistory.addNode(); + IInvokableInstance newInstance = ClusterUtils.addInstance(s2.cluster, n.dc, n.rack, c -> c.set("auto_bootstrap", true)); + newInstance.startup(s2.cluster); + n.up(); + }); + } + + private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs, State<S> state) + { + int toRemove = rs.pickInt(state.topologyHistory.up()); + return new SimpleCommand<>("nodetool decommission node" + toRemove + state.commandNamePostfix(), s2 -> { + IInvokableInstance inst = s2.cluster.get(toRemove); + TopologyHistory.Node node = s2.topologyHistory.node(toRemove); + node.status = TopologyHistory.Node.Status.BeingDecommissioned; + inst.nodetoolResult("decommission").asserts().success(); + ClusterUtils.stopUnchecked(inst); + node.removed(); + }); + } + + private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S> state) + { + int[] up = state.topologyHistory.up(); + int toRemove = rs.pickInt(up); + int toCoordinate; + { + int picked; + do + { + picked = rs.pickInt(up); + } + while (picked == toRemove); + toCoordinate = picked; + } + return multistep(stopInstance(state, toRemove), + new SimpleCommand<>("nodetool removenode node" + toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> { + TopologyHistory.Node node = s2.topologyHistory.node(toRemove); + node.status = TopologyHistory.Node.Status.BeingRemoved; + IInvokableInstance coordinator = s2.cluster.get(toCoordinate); + coordinator.nodetoolResult("removenode", Integer.toString(toRemove), "--force").asserts().success(); + node.removed(); + s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator)); + }), + repairCommand(state, toCoordinate)); + } + + private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs, State<S> state) + { + //TODO (correctness): assassinate CMS member isn't allowed + IntHashSet up = asSet(state.topologyHistory.up()); + IntHashSet cmsGroup = asSet(state.cmsGroup); + Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup); + if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a CMS member"); + List<Integer> allowed = new ArrayList<>(upAndNotInCMS); + allowed.sort(Comparator.naturalOrder()); + int toRemove = rs.pick(allowed); + int toCoordinate; + { + int[] upInt = state.topologyHistory.up(); + int picked; + do + { + picked = rs.pickInt(upInt); + } + while (picked == toRemove); + toCoordinate = picked; + } + return multistep(stopInstance(state, toRemove), + new SimpleCommand<>("nodetool assassinate node" + toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> { + TopologyHistory.Node node = s2.topologyHistory.node(toRemove); + node.status = TopologyHistory.Node.Status.BeingAssassinated; + IInvokableInstance coordinator = s2.cluster.get(toCoordinate); + InetSocketAddress address = s2.cluster.get(toRemove).config().broadcastAddress(); + coordinator.nodetoolResult("assassinate", address.getAddress().getHostAddress() + ":" + address.getPort()).asserts().success(); + node.removed(); + s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator)); + }), + repairCommand(state, toCoordinate) + ); + } + + private Command<State<S>, Void, ?> removeNodeRandomizedDispatch(RandomSource rs, State<S> state) + { + RemoveType type = state.removeTypeGen.next(rs); + switch (type) + { + case Decommission: + return removeNodeDecommission(rs, state); + case RemoveNode: + return removeNode(rs, state); + case Assassinate: + return removeNodeAssassinate(rs, state); + default: + throw new UnsupportedOperationException("Unknown remove type: " + type); + } + } + + private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S> state) + { + int nodeToReplace = rs.pickInt(state.topologyHistory.up()); + IInvokableInstance toReplace = state.cluster.get(nodeToReplace); + TopologyHistory.Node adding = state.topologyHistory.replace(nodeToReplace); + TopologyHistory.Node removing = state.topologyHistory.nodes.get(nodeToReplace); + + return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + " for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> { + ClusterUtils.stopUnchecked(toReplace); + removing.down(); + }), + new SimpleCommand<>("Host Replace Node" + nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> { + logger.info("node{} starting host replacement; epoch={}", adding.id, HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance())); + removing.status = TopologyHistory.Node.Status.BeingReplaced; + IInvokableInstance inst = ClusterUtils.replaceHostAndStart(s2.cluster, toReplace); + s2.topologyHistory.replaced(removing, adding); + long epoch = HackSerialization.tcmEpoch(inst); + s2.currentEpoch.set(epoch); + logger.info("{} completed host replacement in epoch={}", inst, epoch); + }), + //TODO (remove after rebase to trunk): https://issues.apache.org/jira/browse/CASSANDRA-19705 After the rebase to trunk this is not needed. The issue is that the CMS placement removes the node, it does not promote another node, this cases rf=3 to become rf=2 + new SimpleCommand<>("CMS reconfigure on Node" + adding.id + state.commandNamePostfix(), s2 -> s2.cluster.get(adding.id).nodetoolResult("cms", "reconfigure", Integer.toString(TARGET_RF)).asserts().success()) + ); + } + + protected abstract Gen<State<S>> stateGen(); + + protected void preCheck(Property.StatefulBuilder statefulBuilder) + { + + } + + protected void destroyState(State<S> state, @Nullable Throwable cause) throws Throwable + { + + } + + @Test + public void test() + { + Property.StatefulBuilder statefulBuilder = stateful().withSteps(20).withStepTimeout(Duration.ofMinutes(2)).withExamples(1); + preCheck(statefulBuilder); + statefulBuilder.check(new Commands<State<S>, Void>() + { + @Override + public Gen<State<S>> genInitialState() + { + return stateGen(); + } + + @Override + public Void createSut(State<S> state) + { + return null; + } + + @Override + public Gen<Command<State<S>, Void, ?>> commands(State<S> state) + { + state.preActions.forEach(Runnable::run); + + Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new LinkedHashMap<>(); + + // topology change + EnumSet<TopologyChange> possibleTopologyChanges = possibleTopologyChanges(state); + if (!possibleTopologyChanges.isEmpty()) + possible.put(topologyCommand(state, possibleTopologyChanges), 2); + possible.put(rs -> repairCommand(state, rs.pickInt(state.topologyHistory.up())), 1); + possible.put(rs -> state.statementGen.apply(rs, state), 7); + + return Gens.oneOf(possible); + } + + private EnumSet<TopologyChange> possibleTopologyChanges(State<S> state) + { + EnumSet<TopologyChange> possibleTopologyChanges = EnumSet.noneOf(TopologyChange.class); + // up or down is logically more correct, but since this runs sequentially and after the topology changes are complete, we don't have downed nodes at this point + // so up is enough to know the topology size + int size = state.topologyHistory.up().length; + if (size < state.topologyHistory.maxNodes) + possibleTopologyChanges.add(TopologyChange.AddNode); + if (size > state.topologyHistory.quorum()) + { + if (size > TARGET_RF) + possibleTopologyChanges.add(TopologyChange.RemoveNode); + possibleTopologyChanges.add(TopologyChange.HostReplace); + } + return possibleTopologyChanges; + } + + private Gen<Command<State<S>, Void, ?>> topologyCommand(State<S> state, EnumSet<TopologyChange> possibleTopologyChanges) + { + Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new LinkedHashMap<>(); + for (TopologyChange task : possibleTopologyChanges) + { + switch (task) + { + case AddNode: + possible.put(ignore -> multistep(addNode(state), waitForCMSToQuiesce()), 1); + break; + case RemoveNode: + possible.put(rs -> multistep(removeNodeRandomizedDispatch(rs, state), waitForCMSToQuiesce()), 1); + break; + case HostReplace: + possible.put(rs -> multistep(hostReplace(rs, state), waitForCMSToQuiesce()), 1); + break; + default: + throw new UnsupportedOperationException(task.name()); + } + } + return Gens.oneOf(possible); + } + + @Override + public void destroyState(State<S> state, @Nullable Throwable cause) throws Throwable + { + if (cause != null) + { + for (Runnable r : state.onError) + { + try + { + r.run(); + } + catch (Throwable t) + { + cause.addSuppressed(t); + } + } + } + TopologyMixupTestBase.this.destroyState(state, cause); + state.close(); + } + }); + } + + private static IntHashSet asSet(int[] array) + { + IntHashSet set = new IntHashSet(array.length); + for (int i : array) + set.add(i); + return set; + } + + public interface SchemaSpec + { + String name(); + + String keyspaceName(); + } + + protected static class State<S extends SchemaSpec> implements AutoCloseable + { + final TopologyHistory topologyHistory; + final Cluster cluster; + final S schemaSpec; + final List<Runnable> preActions = new CopyOnWriteArrayList<>(); + final List<Runnable> onError = new CopyOnWriteArrayList<>(); + final AtomicLong currentEpoch = new AtomicLong(); + final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>> statementGen; + final Gen<RemoveType> removeTypeGen; + private final Map<String, Object> yamlConfigOverrides; + int[] cmsGroup = new int[0]; + + public State(RandomSource rs, BiFunction<RandomSource, Cluster, S> schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>>> cqlOperationsGen) + { + this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4); + try + { + + this.yamlConfigOverrides = CONF_GEN.next(rs); + cluster = Cluster.build(topologyHistory.minNodes) + .withTokenSupplier(topologyHistory) + .withConfig(c -> { + c.with(Feature.values()) + .set("write_request_timeout", "10s"); + for (Map.Entry<String, Object> e : yamlConfigOverrides.entrySet()) + c.set(e.getKey(), e.getValue()); + //TODO (mainatiance): where to put this? Anything touching ConfigGenBuilder with jvm-dtest needs this... + ((InstanceConfig) c).remove("commitlog_sync_period_in_ms"); + onConfigure(c); + }) + //TODO (maintance): should TopologyHistory also be a INodeProvisionStrategy.Factory so address information is stored in the Node? + //TODO (maintance): AbstractCluster's Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with dc/rack annoying, if this becomes an interface then TopologyHistory could own + .withNodeProvisionStrategy((subnet, portMap) -> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap) + { + { + Invariants.checkArgument(subnet == 0, "Unexpected subnet detected: %d", subnet); + } + + private final String ipPrefix = "127.0." + subnet + '.'; + + @Override + public int seedNodeNum() + { + int[] up = topologyHistory.up(); + if (Arrays.equals(up, new int[]{ 1, 2 })) + return 1; + return rs.pickInt(up); + } + + @Override + public String ipAddress(int nodeNum) + { + return ipPrefix + nodeNum; + } + }) + .start(); + cluster.setUncaughtExceptionsFilter((nodeId, cause) -> { + if (cause.getMessage() != null) + { + // Node /127.0.0.1:7012 is not a member of CMS anymore in Epoch{epoch=47}; members=[/127.0.0.2:7012, /127.0.0.3:7012, /127.0.0.4:7012] + // if a snapshot is scheduled while the node is a CMS member, but it looses membership *before* the commit can happen, then this error happens... + // This error blocks a non-member from voting, so is correct behavior and shouldn't fail the test + if (cause.getMessage().contains("is not a member of CMS anymore in Epoch")) return true; + } + return false; + }); + } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + fixDistributedSchemas(cluster); + init(cluster, TARGET_RF); + // fix TCM + { + NodeToolResult result = cluster.get(1).nodetoolResult("cms", "reconfigure", "2"); + result.asserts().success(); + logger.info("CMS reconfigure: {}", result.getStdout()); + } + preActions.add(new Runnable() + { + // in order to remove this action, an annonomus class is needed so "this" works, lambda "this" is the parent class + @Override + public void run() + { + if (topologyHistory.up().length == TARGET_RF) + { + NodeToolResult result = cluster.get(1).nodetoolResult("cms", "reconfigure", Integer.toString(TARGET_RF)); + result.asserts().success(); + logger.info("CMS reconfigure: {}", result.getStdout()); + preActions.remove(this); + } + } + }); + preActions.add(() -> { + int[] up = topologyHistory.up(); + // use the most recent node just in case the cluster isn't in-sync + IInvokableInstance node = cluster.get(up[up.length - 1]); + cmsGroup = HackSerialization.cmsGroup(node); + currentEpoch.set(HackSerialization.tcmEpoch(node)); + }); + this.schemaSpec = schemaSpecGen.apply(rs, cluster); + statementGen = cqlOperationsGen.apply(schemaSpec); + + removeTypeGen = REMOVE_TYPE_DISTRIBUTION.next(rs); + + long waitForEpoch = HackSerialization.tcmEpoch(cluster.get(1)); + currentEpoch.set(waitForEpoch); + onStartupComplete(waitForEpoch); + } + + protected void onStartupComplete(long tcmEpoch) + { + Review Comment: not used in this patch but accord needs to block on accord knowing this epoch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

