ifesdjeen commented on code in PR #3486:
URL: https://github.com/apache/cassandra/pull/3486#discussion_r1732676610
##########
src/java/org/apache/cassandra/tcm/ClusterMetadataService.java:
##########
@@ -844,11 +844,22 @@ private Pair<State, Processor> delegateInternal()
@Override
public Commit.Result commit(Entry.Id entryId, Transformation
transform, Epoch lastKnown, Retry.Deadline retryPolicy)
{
- Pair<State, Processor> delegate = delegateInternal();
- Commit.Result result = delegate.right.commit(entryId, transform,
lastKnown, retryPolicy);
- if (delegate.left == LOCAL || delegate.left == RESET)
- replicator.send(result, null);
- return result;
+ while (!retryPolicy.reachedMax())
+ {
+ try
+ {
+ Pair<State, Processor> delegate = delegateInternal();
+ Commit.Result result = delegate.right.commit(entryId,
transform, lastKnown, retryPolicy);
+ if (delegate.left == LOCAL || delegate.left == RESET)
+ replicator.send(result, null);
+ return result;
+ }
+ catch (NotCMSException e)
+ {
+ retryPolicy.maybeSleep();
+ }
+ }
+ return Commit.Result.failed(ExceptionCode.SERVER_ERROR, "Could not
commit " + transform.kind() + " at epoch " + lastKnown);
Review Comment:
nit: should we use string format here?
##########
test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java:
##########
@@ -611,8 +632,8 @@ public static void
waitForCMSToQuiesce(ICluster<IInvokableInstance> cluster, Epo
if (cluster.get(j).isShutdown())
continue;
Epoch version = getClusterMetadataVersion(cluster.get(j));
- if (!awaitedEpoch.equals(version))
- notMatching.add(new ClusterMetadataVersion(j, version,
getClusterMetadataVersion(cluster.get(j))));
+ if (!awaitedEpoch.equals(version) && version.getEpoch() <
awaitedEpoch.getEpoch())
Review Comment:
Just `version.getEpoch() < awaitedEpoch.getEpoch()` should be enough here,
or?
##########
test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.utils.Gen;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.PreCheckResult;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.RandomSource;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.harry.HarryHelper;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.sut.SystemUnderTest;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.sut.injvm.InJvmSut;
+
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce;
+
+public class HarryTopologyMixupTest extends
TopologyMixupTestBase<HarryTopologyMixupTest.Spec>
+{
+ @Override
+ protected Gen<State<Spec>> stateGen()
+ {
+ return HarryState::new;
+ }
+
+ @Override
+ protected void preCheck(Property.StatefulBuilder builder)
+ {
+ // if a failing seed is detected, populate here
+ // Example: builder.withSeed(42L);
+ }
+
+ @Override
+ protected void destroyState(State<Spec> state, @Nullable Throwable cause)
+ {
+ if (cause != null) return;
+ if (((HarryState) state).numInserts > 0)
+ {
+ // do one last read just to make sure we validate the data...
+ var harry = state.schemaSpec.harry;
+ harry.validateAll(harry.quiescentLocalChecker());
+ }
+ }
+
+ private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
+ {
+ ReplayingHistoryBuilder harry = HarryHelper.dataGen(rs.nextLong(),
+ new
InJvmSut(cluster),
+ new
TokenPlacementModel.SimpleReplicationFactor(3),
+
SystemUnderTest.ConsistencyLevel.ALL);
+ cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};",
HarryHelper.KEYSPACE));
+ var schema = harry.schema();
+ cluster.schemaChange(schema.compile().cql());
+ waitForCMSToQuiesce(cluster, cluster.get(1));
+ return new Spec(harry);
+ }
+
+ private static BiFunction<RandomSource, State<Spec>, Command<State<Spec>,
Void, ?>> cqlOperations(Spec spec)
+ {
+ class HarryCommand extends SimpleCommand<State<Spec>>
+ {
+ HarryCommand(Function<State<Spec>, String> name,
Consumer<State<Spec>> fn)
+ {
+ super(name, fn);
+ }
+
+ @Override
+ public PreCheckResult checkPreconditions(State<Spec> state)
+ {
+ int clusterSize = state.topologyHistory.up().length;
+ return clusterSize >= 3 ? PreCheckResult.Ok :
PreCheckResult.Ignore;
+ }
+ }
+ Command<State<Spec>, Void, ?> insert = new HarryCommand(state ->
"Harry Insert" + state.commandNamePostfix(), state -> {
+ spec.harry.insert();
+ ((HarryState) state).numInserts++;
+ });
+ Command<State<Spec>, Void, ?> validateAll = new HarryCommand(state ->
"Harry Validate All" + state.commandNamePostfix(), state -> {
+ spec.harry.validateAll(spec.harry.quiescentLocalChecker());
+ ((HarryState) state).numInserts = 0;
+ });
+ return (rs, state) -> {
+ HarryState harryState = (HarryState) state;
+ TopologyHistory history = state.topologyHistory;
+ // if any topology change happened, then always validate all
+ if (harryState.generation != history.generation())
+ {
+ harryState.generation = history.generation();
+ return validateAll;
+ }
+ if (harryState.numInserts > 500
Review Comment:
I do not mind the current state, but am wondering if a slightly different
interface might be more convenient? One option would be to use something like
"stateful model checker" from QT, where you would specify the possible
commands, and supply their probabilities (i.e. 0.8 / 0.2 writes to reads
distribution), something like a `RangeTombstoneBurnTest` would do:
```
var insertChance = 0.8;
var readChacne = 0.2;
.step((history, rng) -> rng.nextFloat() >
insertChance,
(history, rng) -> {
...
})
.step((history, rng) -> rng.nextFloat() >
readChance,
(history, rng) -> {
..
})
```
This interface also nicely fits with the `State` idea you got here.
##########
test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java:
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.Commands;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.Property.StateOnlyCommand;
+import accord.utils.RandomSource;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.utils.ConfigGenBuilder;
+
+import static accord.utils.Property.multistep;
+import static accord.utils.Property.stateful;
+
+/**
+ * These tests can create many instances, so mac users may need to run the
following to avoid address bind failures
+ *
+ * {@code for id in $(seq 0 15); do sudo ifconfig lo0 alias "127.0.0.$id";
done;}
+ */
+public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.SchemaSpec> extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(TopologyMixupTestBase.class);
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ private enum TopologyChange
+ {
+ AddNode,
+ RemoveNode,
+ HostReplace,
+ //TODO (coverage): add the following states once supported
+// StopNode,
+// StartNode,
+// MoveToken
+ //TODO (coverage): node migrate to another rack or dc (unsupported on
trunk as of this writing, but planned work for TCM)
+// MoveNodeToNewRack,
+// MoveNodeToNewDC,
+ }
+
+ private enum RemoveType
+ {Decommission, RemoveNode, Assassinate}
+
+ private static final Gen.IntGen MURMUR_TOKEN_GEN = rs ->
rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE);
+ private static final int TARGET_RF = 3;
+ private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION =
Gens.enums().allMixedDistribution(RemoveType.class);
+ private static final Gen<Map<String, Object>> CONF_GEN = new
ConfigGenBuilder()
+ // jvm-dtest hard
codes this partitioner in its APIs, so overriding will break the test
+
.withPartitionerGen(null)
+ .build();
+
+ // common commands
+ private Command<State<S>, Void, ?> repairCommand(State<S> state, int
toCoordinate)
+ {
+ return new SimpleCommand<>("nodetool repair " +
state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node"
+ toCoordinate + state.commandNamePostfix(),
+ s2 ->
s2.cluster.get(toCoordinate).nodetoolResult("repair",
state.schemaSpec.keyspaceName(), s2.schemaSpec.name()).asserts().success());
+ }
+
+ private Command<State<S>, Void, ?> waitForCMSToQuiesce()
+ {
+ return new StateOnlyCommand<>()
+ {
+ @Override
+ public String detailed(State<S> state)
+ {
+ return "Waiting for CMS to Quiesce" +
state.commandNamePostfix();
+ }
+
+ @Override
+ public void applyUnit(State<S> state)
+ {
+ ClusterUtils.waitForCMSToQuiesce(state.cluster,
state.cmsGroup);
+ }
+ };
+ }
+
+ private Command<State<S>, Void, ?> stopInstance(State<S> state, int
toRemove)
+ {
+ return new SimpleCommand<>("Stop Node" + toRemove + " for Assassinate"
+ state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ ClusterUtils.stopUnchecked(inst);
+ node.down();
+ });
+ }
+
+ private Command<State<S>, Void, ?> addNode(State<S> state)
+ {
+ int nodeId = state.topologyHistory.uniqueInstances + 1;
+ return new SimpleCommand<>("Add Node" + nodeId +
state.commandNamePostfix(),
+ s2 -> {
+ TopologyHistory.Node n =
s2.topologyHistory.addNode();
+ IInvokableInstance newInstance =
ClusterUtils.addInstance(s2.cluster, n.dc, n.rack, c -> c.set("auto_bootstrap",
true));
+ newInstance.startup(s2.cluster);
+ n.up();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs,
State<S> state)
+ {
+ int toRemove = rs.pickInt(state.topologyHistory.up());
+ return new SimpleCommand<>("nodetool decommission node" + toRemove +
state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ node.status = TopologyHistory.Node.Status.BeingDecommissioned;
+ inst.nodetoolResult("decommission").asserts().success();
+ ClusterUtils.stopUnchecked(inst);
+ node.removed();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S>
state)
+ {
+ int[] up = state.topologyHistory.up();
+ int toRemove = rs.pickInt(up);
+ int toCoordinate;
+ {
+ int picked;
+ do
+ {
+ picked = rs.pickInt(up);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(state, toRemove),
+ new SimpleCommand<>("nodetool removenode node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingRemoved;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ coordinator.nodetoolResult("removenode",
Integer.toString(toRemove), "--force").asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(state, toCoordinate));
+ }
+
+ private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs,
State<S> state)
+ {
+ //TODO (correctness): assassinate CMS member isn't allowed
+ IntHashSet up = asSet(state.topologyHistory.up());
+ IntHashSet cmsGroup = asSet(state.cmsGroup);
+ Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
+ if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a
CMS member");
+ List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
+ allowed.sort(Comparator.naturalOrder());
+ int toRemove = rs.pick(allowed);
+ int toCoordinate;
+ {
+ int[] upInt = state.topologyHistory.up();
+ int picked;
+ do
+ {
+ picked = rs.pickInt(upInt);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(state, toRemove),
+ new SimpleCommand<>("nodetool assassinate node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingAssassinated;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ InetSocketAddress address =
s2.cluster.get(toRemove).config().broadcastAddress();
+ coordinator.nodetoolResult("assassinate",
address.getAddress().getHostAddress() + ":" +
address.getPort()).asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(state, toCoordinate)
+ );
+ }
+
+ private Command<State<S>, Void, ?>
removeNodeRandomizedDispatch(RandomSource rs, State<S> state)
+ {
+ RemoveType type = state.removeTypeGen.next(rs);
+ switch (type)
+ {
+ case Decommission:
+ return removeNodeDecommission(rs, state);
+ case RemoveNode:
+ return removeNode(rs, state);
+ case Assassinate:
+ return removeNodeAssassinate(rs, state);
+ default:
+ throw new UnsupportedOperationException("Unknown remove type:
" + type);
+ }
+ }
+
+ private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S>
state)
+ {
+ int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+ IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
+ TopologyHistory.Node adding =
state.topologyHistory.replace(nodeToReplace);
+ TopologyHistory.Node removing =
state.topologyHistory.nodes.get(nodeToReplace);
+
+ return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + "
for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ ClusterUtils.stopUnchecked(toReplace);
+ removing.down();
+ }),
+ new SimpleCommand<>("Host Replace Node" +
nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ logger.info("node{} starting host replacement;
epoch={}", adding.id,
HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance()));
+ removing.status =
TopologyHistory.Node.Status.BeingReplaced;
+ IInvokableInstance inst =
ClusterUtils.replaceHostAndStart(s2.cluster, toReplace);
+ s2.topologyHistory.replaced(removing, adding);
+ long epoch = HackSerialization.tcmEpoch(inst);
+ s2.currentEpoch.set(epoch);
+ logger.info("{} completed host replacement in
epoch={}", inst, epoch);
+ }),
+ //TODO (remove after rebase to trunk):
https://issues.apache.org/jira/browse/CASSANDRA-19705 After the rebase to
trunk this is not needed. The issue is that the CMS placement removes the
node, it does not promote another node, this cases rf=3 to become rf=2
+ new SimpleCommand<>("CMS reconfigure on Node" +
adding.id + state.commandNamePostfix(), s2 ->
s2.cluster.get(adding.id).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF)).asserts().success())
+ );
+ }
+
+ protected abstract Gen<State<S>> stateGen();
+
+ protected void preCheck(Property.StatefulBuilder statefulBuilder)
+ {
+
+ }
+
+ protected void destroyState(State<S> state, @Nullable Throwable cause)
throws Throwable
+ {
+
+ }
+
+ @Test
+ public void test()
+ {
+ Property.StatefulBuilder statefulBuilder =
stateful().withSteps(50).withStepTimeout(Duration.ofMinutes(2)).withExamples(1);
+ preCheck(statefulBuilder);
+ statefulBuilder.check(new Commands<State<S>, Void>()
+ {
+ @Override
+ public Gen<State<S>> genInitialState()
+ {
+ return stateGen();
+ }
+
+ @Override
+ public Void createSut(State<S> state)
+ {
+ return null;
+ }
+
+ @Override
+ public Gen<Command<State<S>, Void, ?>> commands(State<S> state)
+ {
+ state.preActions.forEach(Runnable::run);
+
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+
+ // topology change
+ EnumSet<TopologyChange> possibleTopologyChanges =
possibleTopologyChanges(state);
+ if (!possibleTopologyChanges.isEmpty())
+ possible.put(topologyCommand(state,
possibleTopologyChanges), 2);
+ possible.put(rs -> repairCommand(state,
rs.pickInt(state.topologyHistory.up())), 1);
+ possible.put(rs -> state.statementGen.apply(rs, state), 7);
+
+ return Gens.oneOf(possible);
+ }
+
+ private EnumSet<TopologyChange> possibleTopologyChanges(State<S>
state)
+ {
+ EnumSet<TopologyChange> possibleTopologyChanges =
EnumSet.noneOf(TopologyChange.class);
+ // up or down is logically more correct, but since this runs
sequentially and after the topology changes are complete, we don't have downed
nodes at this point
+ // so up is enough to know the topology size
+ int size = state.topologyHistory.up().length;
+ if (size < state.topologyHistory.maxNodes)
+ possibleTopologyChanges.add(TopologyChange.AddNode);
+ if (size > state.topologyHistory.quorum())
+ {
+ if (size > TARGET_RF)
+ possibleTopologyChanges.add(TopologyChange.RemoveNode);
+ possibleTopologyChanges.add(TopologyChange.HostReplace);
+ }
+ return possibleTopologyChanges;
+ }
+
+ private Gen<Command<State<S>, Void, ?>> topologyCommand(State<S>
state, EnumSet<TopologyChange> possibleTopologyChanges)
+ {
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+ for (TopologyChange task : possibleTopologyChanges)
+ {
+ switch (task)
+ {
+ case AddNode:
+ possible.put(ignore -> multistep(addNode(state),
waitForCMSToQuiesce()), 1);
+ break;
+ case RemoveNode:
+ possible.put(rs ->
multistep(removeNodeRandomizedDispatch(rs, state), waitForCMSToQuiesce()), 1);
+ break;
+ case HostReplace:
+ possible.put(rs -> multistep(hostReplace(rs,
state), waitForCMSToQuiesce()), 1);
+ break;
+ default:
+ throw new
UnsupportedOperationException(task.name());
+ }
+ }
+ return Gens.oneOf(possible);
+ }
+
+ @Override
+ public void destroyState(State<S> state, @Nullable Throwable
cause) throws Throwable
+ {
+ if (cause != null)
+ {
+ for (Runnable r : state.onError)
+ {
+ try
+ {
+ r.run();
+ }
+ catch (Throwable t)
+ {
+ cause.addSuppressed(t);
+ }
+ }
+ }
+ TopologyMixupTestBase.this.destroyState(state, cause);
+ state.close();
+ }
+ });
+ }
+
+ private static IntHashSet asSet(int[] array)
+ {
+ IntHashSet set = new IntHashSet(array.length);
+ for (int i : array)
+ set.add(i);
+ return set;
+ }
+
+ public interface SchemaSpec
+ {
+ String name();
+
+ String keyspaceName();
+ }
+
+ protected static class State<S extends SchemaSpec> implements AutoCloseable
+ {
+ final TopologyHistory topologyHistory;
+ final Cluster cluster;
+ final S schemaSpec;
+ final List<Runnable> preActions = new CopyOnWriteArrayList<>();
+ final List<Runnable> onError = new CopyOnWriteArrayList<>();
+ final AtomicLong currentEpoch = new AtomicLong();
+ final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>>
statementGen;
+ final Gen<RemoveType> removeTypeGen;
+ private final Map<String, Object> yamlConfigOverrides;
+ int[] cmsGroup = new int[0];
+
+ public State(RandomSource rs, BiFunction<RandomSource, Cluster, S>
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>,
Void, ?>>> cqlOperationsGen)
+ {
+ this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+ try
+ {
+
+ this.yamlConfigOverrides = CONF_GEN.next(rs);
+ cluster = Cluster.build(topologyHistory.minNodes)
+ .withTokenSupplier(topologyHistory)
+ .withConfig(c -> {
+ c.with(Feature.values())
+ .set("write_request_timeout", "10s");
+ //TODO (mainatiance): where to put this?
Anything touching ConfigGenBuilder with jvm-dtest needs this...
+ ((InstanceConfig)
c).remove("commitlog_sync_period_in_ms");
+ for (Map.Entry<String, Object> e :
yamlConfigOverrides.entrySet())
+ c.set(e.getKey(), e.getValue());
+ onConfigure(c);
+ })
+ //TODO (maintance): should TopologyHistory
also be a INodeProvisionStrategy.Factory so address information is stored in
the Node?
+ //TODO (maintance): AbstractCluster's
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with
dc/rack annoying, if this becomes an interface then TopologyHistory could own
+ .withNodeProvisionStrategy((subnet, portMap)
-> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap)
+ {
+ {
+ Invariants.checkArgument(subnet == 0,
"Unexpected subnet detected: %d", subnet);
+ }
+
+ private final String ipPrefix = "127.0."
+ subnet + '.';
+
+ @Override
+ public int seedNodeNum()
+ {
+ int[] up = topologyHistory.up();
+ if (Arrays.equals(up, new int[]{ 1, 2
}))
+ return 1;
+ return rs.pickInt(up);
+ }
+
+ @Override
+ public String ipAddress(int nodeNum)
+ {
+ return ipPrefix + nodeNum;
+ }
+ })
+ .start();
+ }
+ catch (IOException e)
+ {
+ throw new UncheckedIOException(e);
+ }
+ fixDistributedSchemas(cluster);
+ init(cluster, TARGET_RF);
+ // fix TCM
+ {
+ NodeToolResult result = cluster.get(1).nodetoolResult("cms",
"reconfigure", "2");
+ result.asserts().success();
+ logger.info("CMS reconfigure: {}", result.getStdout());
+ }
+ preActions.add(new Runnable()
+ {
+ // in order to remove this action, an annonomus class is
needed so "this" works, lambda "this" is the parent class
+ @Override
+ public void run()
+ {
+ if (topologyHistory.up().length == TARGET_RF)
+ {
+ NodeToolResult result =
cluster.get(1).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF));
+ result.asserts().success();
+ logger.info("CMS reconfigure: {}", result.getStdout());
+ preActions.remove(this);
+ }
+ }
+ });
+ preActions.add(() -> {
+ int[] up = topologyHistory.up();
+ // use the most recent node just in case the cluster isn't
in-sync
+ IInvokableInstance node = cluster.get(up[up.length - 1]);
+ cmsGroup = HackSerialization.cmsGroup(node);
+ currentEpoch.set(HackSerialization.tcmEpoch(node));
+ });
+ preActions.add(() -> cluster.checkAndResetUncaughtExceptions());
+ this.schemaSpec = schemaSpecGen.apply(rs, cluster);
+ statementGen = cqlOperationsGen.apply(schemaSpec);
+
+ removeTypeGen = REMOVE_TYPE_DISTRIBUTION.next(rs);
+
+ long waitForEpoch = HackSerialization.tcmEpoch(cluster.get(1));
+ currentEpoch.set(waitForEpoch);
+ onStartupComplete(waitForEpoch);
+ }
+
+ protected void onStartupComplete(long tcmEpoch)
+ {
+
+ }
+
+ protected void onConfigure(IInstanceConfig config)
+ {
+
+ }
+
+ protected String commandNamePostfix()
+ {
+ return "; epoch=" + currentEpoch.get() + ", cms=" +
Arrays.toString(cmsGroup);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Yaml
Config:\n").append(YamlConfigurationLoader.toYaml(this.yamlConfigOverrides));
+ sb.append("\nTopology:\n").append(topologyHistory);
+ sb.append("\nCMS Voting Group:
").append(Arrays.toString(cmsGroup));
+ return sb.toString();
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ cluster.close();
+ }
+ }
+
+ public static class TopologyHistory implements TokenSupplier
+ {
+ private final RandomSource rs;
+ private final int tokensPerNode;
+ private final int minNodes, maxNodes;
+
+ private final Int2ObjectHashMap<Node> nodes = new
Int2ObjectHashMap<>();
+ private final Set<String> activeTokens = new HashSet<>();
+ private int uniqueInstances = 0;
+ /**
+ * Tracks how many topology change events were performed
+ */
+ private int generation = 0;
+
+ public TopologyHistory(RandomSource rs, int minNodes, int maxNodes)
+ {
+ this.rs = rs;
+ this.minNodes = minNodes;
+ this.maxNodes = maxNodes;
+ this.tokensPerNode = Cluster.build(1).getTokenCount();
+ for (int i = 0; i < minNodes; i++)
+ addNode();
+ for (Node n : nodes.values())
+ n.status = Node.Status.Up;
+ }
+
+ public long generation()
+ {
+ return generation;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i <= nodes.size(); i++)
+ {
+ Node node = nodes.get(i);
+ sb.append("\n\tNode").append(i).append(":
status=").append(node.status).append(", tokens=").append(node.tokens);
+ }
+ return sb.toString();
+ }
+
+ public int quorum()
+ {
+ return (TARGET_RF / 2) + 1;
+ }
+
+ @Override
+ public Collection<String> tokens(int i)
+ {
+ Node n = nodes.get(i);
+ if (n == null)
+ throw new IllegalArgumentException("Unknown node" + i);
+ return n.tokens;
+ }
+
+ public int[] active()
+ {
+ int[] keys = new int[nodes.size()];
+ Int2ObjectHashMap<Node>.KeyIterator it = nodes.keySet().iterator();
+ for (int i = 0; it.hasNext(); i++)
+ keys[i] = it.nextInt();
+ Arrays.sort(keys);
+ return keys;
+ }
+
+ public int[] up()
+ {
+ IntArrayList up = new IntArrayList(nodes.size(), -1);
+ for (Map.Entry<Integer, Node> n : nodes.entrySet())
+ {
+ if (n.getValue().status == Node.Status.Up)
+ up.add(n.getKey());
+ }
+ int[] ints = up.toIntArray();
+ Arrays.sort(ints);
+ return ints;
+ }
+
+ public int size()
+ {
+ return nodes.size();
+ }
+
+ public Node addNode()
+ {
+ int id = ++uniqueInstances;
+ List<String> instTokens = Gens.lists(MURMUR_TOKEN_GEN
Review Comment:
should we make this static? looks like we want just one instance of this
generator, or?
##########
test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java:
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.Commands;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.Property.StateOnlyCommand;
+import accord.utils.RandomSource;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.utils.ConfigGenBuilder;
+
+import static accord.utils.Property.multistep;
+import static accord.utils.Property.stateful;
+
+/**
+ * These tests can create many instances, so mac users may need to run the
following to avoid address bind failures
+ *
+ * {@code for id in $(seq 0 15); do sudo ifconfig lo0 alias "127.0.0.$id";
done;}
+ */
+public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.SchemaSpec> extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(TopologyMixupTestBase.class);
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ private enum TopologyChange
+ {
+ AddNode,
+ RemoveNode,
+ HostReplace,
+ //TODO (coverage): add the following states once supported
+// StopNode,
+// StartNode,
+// MoveToken
+ //TODO (coverage): node migrate to another rack or dc (unsupported on
trunk as of this writing, but planned work for TCM)
+// MoveNodeToNewRack,
+// MoveNodeToNewDC,
+ }
+
+ private enum RemoveType
+ {Decommission, RemoveNode, Assassinate}
+
+ private static final Gen.IntGen MURMUR_TOKEN_GEN = rs ->
rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE);
+ private static final int TARGET_RF = 3;
+ private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION =
Gens.enums().allMixedDistribution(RemoveType.class);
+ private static final Gen<Map<String, Object>> CONF_GEN = new
ConfigGenBuilder()
+ // jvm-dtest hard
codes this partitioner in its APIs, so overriding will break the test
+
.withPartitionerGen(null)
+ .build();
+
+ // common commands
+ private Command<State<S>, Void, ?> repairCommand(State<S> state, int
toCoordinate)
+ {
+ return new SimpleCommand<>("nodetool repair " +
state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node"
+ toCoordinate + state.commandNamePostfix(),
+ s2 ->
s2.cluster.get(toCoordinate).nodetoolResult("repair",
state.schemaSpec.keyspaceName(), s2.schemaSpec.name()).asserts().success());
+ }
+
+ private Command<State<S>, Void, ?> waitForCMSToQuiesce()
+ {
+ return new StateOnlyCommand<>()
+ {
+ @Override
+ public String detailed(State<S> state)
+ {
+ return "Waiting for CMS to Quiesce" +
state.commandNamePostfix();
+ }
+
+ @Override
+ public void applyUnit(State<S> state)
+ {
+ ClusterUtils.waitForCMSToQuiesce(state.cluster,
state.cmsGroup);
+ }
+ };
+ }
+
+ private Command<State<S>, Void, ?> stopInstance(State<S> state, int
toRemove)
+ {
+ return new SimpleCommand<>("Stop Node" + toRemove + " for Assassinate"
+ state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ ClusterUtils.stopUnchecked(inst);
+ node.down();
+ });
+ }
+
+ private Command<State<S>, Void, ?> addNode(State<S> state)
+ {
+ int nodeId = state.topologyHistory.uniqueInstances + 1;
+ return new SimpleCommand<>("Add Node" + nodeId +
state.commandNamePostfix(),
+ s2 -> {
+ TopologyHistory.Node n =
s2.topologyHistory.addNode();
+ IInvokableInstance newInstance =
ClusterUtils.addInstance(s2.cluster, n.dc, n.rack, c -> c.set("auto_bootstrap",
true));
+ newInstance.startup(s2.cluster);
+ n.up();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs,
State<S> state)
+ {
+ int toRemove = rs.pickInt(state.topologyHistory.up());
+ return new SimpleCommand<>("nodetool decommission node" + toRemove +
state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ node.status = TopologyHistory.Node.Status.BeingDecommissioned;
+ inst.nodetoolResult("decommission").asserts().success();
+ ClusterUtils.stopUnchecked(inst);
+ node.removed();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S>
state)
+ {
+ int[] up = state.topologyHistory.up();
+ int toRemove = rs.pickInt(up);
+ int toCoordinate;
+ {
+ int picked;
+ do
+ {
+ picked = rs.pickInt(up);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(state, toRemove),
+ new SimpleCommand<>("nodetool removenode node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingRemoved;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ coordinator.nodetoolResult("removenode",
Integer.toString(toRemove), "--force").asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(state, toCoordinate));
+ }
+
+ private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs,
State<S> state)
+ {
+ //TODO (correctness): assassinate CMS member isn't allowed
+ IntHashSet up = asSet(state.topologyHistory.up());
+ IntHashSet cmsGroup = asSet(state.cmsGroup);
+ Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
+ if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a
CMS member");
+ List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
+ allowed.sort(Comparator.naturalOrder());
+ int toRemove = rs.pick(allowed);
+ int toCoordinate;
+ {
+ int[] upInt = state.topologyHistory.up();
+ int picked;
+ do
+ {
+ picked = rs.pickInt(upInt);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(state, toRemove),
+ new SimpleCommand<>("nodetool assassinate node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingAssassinated;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ InetSocketAddress address =
s2.cluster.get(toRemove).config().broadcastAddress();
+ coordinator.nodetoolResult("assassinate",
address.getAddress().getHostAddress() + ":" +
address.getPort()).asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(state, toCoordinate)
+ );
+ }
+
+ private Command<State<S>, Void, ?>
removeNodeRandomizedDispatch(RandomSource rs, State<S> state)
+ {
+ RemoveType type = state.removeTypeGen.next(rs);
+ switch (type)
+ {
+ case Decommission:
+ return removeNodeDecommission(rs, state);
+ case RemoveNode:
+ return removeNode(rs, state);
+ case Assassinate:
+ return removeNodeAssassinate(rs, state);
+ default:
+ throw new UnsupportedOperationException("Unknown remove type:
" + type);
+ }
+ }
+
+ private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S>
state)
+ {
+ int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+ IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
+ TopologyHistory.Node adding =
state.topologyHistory.replace(nodeToReplace);
+ TopologyHistory.Node removing =
state.topologyHistory.nodes.get(nodeToReplace);
+
+ return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + "
for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ ClusterUtils.stopUnchecked(toReplace);
+ removing.down();
+ }),
+ new SimpleCommand<>("Host Replace Node" +
nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ logger.info("node{} starting host replacement;
epoch={}", adding.id,
HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance()));
+ removing.status =
TopologyHistory.Node.Status.BeingReplaced;
+ IInvokableInstance inst =
ClusterUtils.replaceHostAndStart(s2.cluster, toReplace);
+ s2.topologyHistory.replaced(removing, adding);
+ long epoch = HackSerialization.tcmEpoch(inst);
+ s2.currentEpoch.set(epoch);
+ logger.info("{} completed host replacement in
epoch={}", inst, epoch);
+ }),
+ //TODO (remove after rebase to trunk):
https://issues.apache.org/jira/browse/CASSANDRA-19705 After the rebase to
trunk this is not needed. The issue is that the CMS placement removes the
node, it does not promote another node, this cases rf=3 to become rf=2
+ new SimpleCommand<>("CMS reconfigure on Node" +
adding.id + state.commandNamePostfix(), s2 ->
s2.cluster.get(adding.id).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF)).asserts().success())
+ );
+ }
+
+ protected abstract Gen<State<S>> stateGen();
+
+ protected void preCheck(Property.StatefulBuilder statefulBuilder)
+ {
+
+ }
+
+ protected void destroyState(State<S> state, @Nullable Throwable cause)
throws Throwable
+ {
+
+ }
+
+ @Test
+ public void test()
+ {
+ Property.StatefulBuilder statefulBuilder =
stateful().withSteps(50).withStepTimeout(Duration.ofMinutes(2)).withExamples(1);
+ preCheck(statefulBuilder);
+ statefulBuilder.check(new Commands<State<S>, Void>()
+ {
+ @Override
+ public Gen<State<S>> genInitialState()
+ {
+ return stateGen();
+ }
+
+ @Override
+ public Void createSut(State<S> state)
+ {
+ return null;
+ }
+
+ @Override
+ public Gen<Command<State<S>, Void, ?>> commands(State<S> state)
+ {
+ state.preActions.forEach(Runnable::run);
+
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+
+ // topology change
+ EnumSet<TopologyChange> possibleTopologyChanges =
possibleTopologyChanges(state);
+ if (!possibleTopologyChanges.isEmpty())
+ possible.put(topologyCommand(state,
possibleTopologyChanges), 2);
+ possible.put(rs -> repairCommand(state,
rs.pickInt(state.topologyHistory.up())), 1);
+ possible.put(rs -> state.statementGen.apply(rs, state), 7);
+
+ return Gens.oneOf(possible);
+ }
+
+ private EnumSet<TopologyChange> possibleTopologyChanges(State<S>
state)
+ {
+ EnumSet<TopologyChange> possibleTopologyChanges =
EnumSet.noneOf(TopologyChange.class);
+ // up or down is logically more correct, but since this runs
sequentially and after the topology changes are complete, we don't have downed
nodes at this point
+ // so up is enough to know the topology size
+ int size = state.topologyHistory.up().length;
+ if (size < state.topologyHistory.maxNodes)
+ possibleTopologyChanges.add(TopologyChange.AddNode);
+ if (size > state.topologyHistory.quorum())
+ {
+ if (size > TARGET_RF)
+ possibleTopologyChanges.add(TopologyChange.RemoveNode);
+ possibleTopologyChanges.add(TopologyChange.HostReplace);
+ }
+ return possibleTopologyChanges;
+ }
+
+ private Gen<Command<State<S>, Void, ?>> topologyCommand(State<S>
state, EnumSet<TopologyChange> possibleTopologyChanges)
+ {
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+ for (TopologyChange task : possibleTopologyChanges)
+ {
+ switch (task)
+ {
+ case AddNode:
+ possible.put(ignore -> multistep(addNode(state),
waitForCMSToQuiesce()), 1);
+ break;
+ case RemoveNode:
+ possible.put(rs ->
multistep(removeNodeRandomizedDispatch(rs, state), waitForCMSToQuiesce()), 1);
+ break;
+ case HostReplace:
+ possible.put(rs -> multistep(hostReplace(rs,
state), waitForCMSToQuiesce()), 1);
+ break;
+ default:
+ throw new
UnsupportedOperationException(task.name());
+ }
+ }
+ return Gens.oneOf(possible);
+ }
+
+ @Override
+ public void destroyState(State<S> state, @Nullable Throwable
cause) throws Throwable
+ {
+ if (cause != null)
+ {
+ for (Runnable r : state.onError)
+ {
+ try
+ {
+ r.run();
+ }
+ catch (Throwable t)
+ {
+ cause.addSuppressed(t);
+ }
+ }
+ }
+ TopologyMixupTestBase.this.destroyState(state, cause);
+ state.close();
+ }
+ });
+ }
+
+ private static IntHashSet asSet(int[] array)
+ {
+ IntHashSet set = new IntHashSet(array.length);
+ for (int i : array)
+ set.add(i);
+ return set;
+ }
+
+ public interface SchemaSpec
+ {
+ String name();
+
+ String keyspaceName();
+ }
+
+ protected static class State<S extends SchemaSpec> implements AutoCloseable
+ {
+ final TopologyHistory topologyHistory;
+ final Cluster cluster;
+ final S schemaSpec;
+ final List<Runnable> preActions = new CopyOnWriteArrayList<>();
+ final List<Runnable> onError = new CopyOnWriteArrayList<>();
+ final AtomicLong currentEpoch = new AtomicLong();
+ final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>>
statementGen;
+ final Gen<RemoveType> removeTypeGen;
+ private final Map<String, Object> yamlConfigOverrides;
+ int[] cmsGroup = new int[0];
+
+ public State(RandomSource rs, BiFunction<RandomSource, Cluster, S>
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>,
Void, ?>>> cqlOperationsGen)
+ {
+ this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+ try
+ {
+
+ this.yamlConfigOverrides = CONF_GEN.next(rs);
+ cluster = Cluster.build(topologyHistory.minNodes)
+ .withTokenSupplier(topologyHistory)
+ .withConfig(c -> {
+ c.with(Feature.values())
+ .set("write_request_timeout", "10s");
+ //TODO (mainatiance): where to put this?
Anything touching ConfigGenBuilder with jvm-dtest needs this...
+ ((InstanceConfig)
c).remove("commitlog_sync_period_in_ms");
+ for (Map.Entry<String, Object> e :
yamlConfigOverrides.entrySet())
+ c.set(e.getKey(), e.getValue());
+ onConfigure(c);
+ })
+ //TODO (maintance): should TopologyHistory
also be a INodeProvisionStrategy.Factory so address information is stored in
the Node?
+ //TODO (maintance): AbstractCluster's
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with
dc/rack annoying, if this becomes an interface then TopologyHistory could own
+ .withNodeProvisionStrategy((subnet, portMap)
-> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap)
+ {
+ {
+ Invariants.checkArgument(subnet == 0,
"Unexpected subnet detected: %d", subnet);
+ }
+
+ private final String ipPrefix = "127.0."
+ subnet + '.';
+
+ @Override
+ public int seedNodeNum()
+ {
+ int[] up = topologyHistory.up();
+ if (Arrays.equals(up, new int[]{ 1, 2
}))
+ return 1;
+ return rs.pickInt(up);
+ }
+
+ @Override
+ public String ipAddress(int nodeNum)
+ {
+ return ipPrefix + nodeNum;
+ }
+ })
+ .start();
+ }
+ catch (IOException e)
+ {
+ throw new UncheckedIOException(e);
+ }
+ fixDistributedSchemas(cluster);
+ init(cluster, TARGET_RF);
+ // fix TCM
+ {
+ NodeToolResult result = cluster.get(1).nodetoolResult("cms",
"reconfigure", "2");
+ result.asserts().success();
+ logger.info("CMS reconfigure: {}", result.getStdout());
+ }
+ preActions.add(new Runnable()
+ {
+ // in order to remove this action, an annonomus class is
needed so "this" works, lambda "this" is the parent class
Review Comment:
annonomus > anonymous
##########
test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java:
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.Commands;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.Property.StateOnlyCommand;
+import accord.utils.RandomSource;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.utils.ConfigGenBuilder;
+
+import static accord.utils.Property.multistep;
+import static accord.utils.Property.stateful;
+
+/**
+ * These tests can create many instances, so mac users may need to run the
following to avoid address bind failures
+ *
+ * {@code for id in $(seq 0 15); do sudo ifconfig lo0 alias "127.0.0.$id";
done;}
+ */
+public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.SchemaSpec> extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(TopologyMixupTestBase.class);
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ private enum TopologyChange
+ {
+ AddNode,
+ RemoveNode,
+ HostReplace,
+ //TODO (coverage): add the following states once supported
+// StopNode,
+// StartNode,
+// MoveToken
+ //TODO (coverage): node migrate to another rack or dc (unsupported on
trunk as of this writing, but planned work for TCM)
+// MoveNodeToNewRack,
+// MoveNodeToNewDC,
+ }
+
+ private enum RemoveType
+ {Decommission, RemoveNode, Assassinate}
+
+ private static final Gen.IntGen MURMUR_TOKEN_GEN = rs ->
rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE);
+ private static final int TARGET_RF = 3;
+ private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION =
Gens.enums().allMixedDistribution(RemoveType.class);
+ private static final Gen<Map<String, Object>> CONF_GEN = new
ConfigGenBuilder()
+ // jvm-dtest hard
codes this partitioner in its APIs, so overriding will break the test
+
.withPartitionerGen(null)
+ .build();
+
+ // common commands
+ private Command<State<S>, Void, ?> repairCommand(State<S> state, int
toCoordinate)
+ {
+ return new SimpleCommand<>("nodetool repair " +
state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node"
+ toCoordinate + state.commandNamePostfix(),
+ s2 ->
s2.cluster.get(toCoordinate).nodetoolResult("repair",
state.schemaSpec.keyspaceName(), s2.schemaSpec.name()).asserts().success());
+ }
+
+ private Command<State<S>, Void, ?> waitForCMSToQuiesce()
+ {
+ return new StateOnlyCommand<>()
+ {
+ @Override
+ public String detailed(State<S> state)
+ {
+ return "Waiting for CMS to Quiesce" +
state.commandNamePostfix();
+ }
+
+ @Override
+ public void applyUnit(State<S> state)
+ {
+ ClusterUtils.waitForCMSToQuiesce(state.cluster,
state.cmsGroup);
+ }
+ };
+ }
+
+ private Command<State<S>, Void, ?> stopInstance(State<S> state, int
toRemove)
+ {
+ return new SimpleCommand<>("Stop Node" + toRemove + " for Assassinate"
+ state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ ClusterUtils.stopUnchecked(inst);
+ node.down();
+ });
+ }
+
+ private Command<State<S>, Void, ?> addNode(State<S> state)
+ {
+ int nodeId = state.topologyHistory.uniqueInstances + 1;
+ return new SimpleCommand<>("Add Node" + nodeId +
state.commandNamePostfix(),
+ s2 -> {
+ TopologyHistory.Node n =
s2.topologyHistory.addNode();
+ IInvokableInstance newInstance =
ClusterUtils.addInstance(s2.cluster, n.dc, n.rack, c -> c.set("auto_bootstrap",
true));
+ newInstance.startup(s2.cluster);
+ n.up();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs,
State<S> state)
+ {
+ int toRemove = rs.pickInt(state.topologyHistory.up());
+ return new SimpleCommand<>("nodetool decommission node" + toRemove +
state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ node.status = TopologyHistory.Node.Status.BeingDecommissioned;
+ inst.nodetoolResult("decommission").asserts().success();
+ ClusterUtils.stopUnchecked(inst);
+ node.removed();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S>
state)
+ {
+ int[] up = state.topologyHistory.up();
+ int toRemove = rs.pickInt(up);
+ int toCoordinate;
+ {
+ int picked;
+ do
+ {
+ picked = rs.pickInt(up);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(state, toRemove),
+ new SimpleCommand<>("nodetool removenode node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingRemoved;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ coordinator.nodetoolResult("removenode",
Integer.toString(toRemove), "--force").asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(state, toCoordinate));
+ }
+
+ private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs,
State<S> state)
+ {
+ //TODO (correctness): assassinate CMS member isn't allowed
+ IntHashSet up = asSet(state.topologyHistory.up());
+ IntHashSet cmsGroup = asSet(state.cmsGroup);
+ Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
+ if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a
CMS member");
+ List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
+ allowed.sort(Comparator.naturalOrder());
+ int toRemove = rs.pick(allowed);
+ int toCoordinate;
+ {
+ int[] upInt = state.topologyHistory.up();
+ int picked;
+ do
+ {
+ picked = rs.pickInt(upInt);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(state, toRemove),
+ new SimpleCommand<>("nodetool assassinate node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingAssassinated;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ InetSocketAddress address =
s2.cluster.get(toRemove).config().broadcastAddress();
+ coordinator.nodetoolResult("assassinate",
address.getAddress().getHostAddress() + ":" +
address.getPort()).asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(state, toCoordinate)
+ );
+ }
+
+ private Command<State<S>, Void, ?>
removeNodeRandomizedDispatch(RandomSource rs, State<S> state)
+ {
+ RemoveType type = state.removeTypeGen.next(rs);
+ switch (type)
+ {
+ case Decommission:
+ return removeNodeDecommission(rs, state);
+ case RemoveNode:
+ return removeNode(rs, state);
+ case Assassinate:
+ return removeNodeAssassinate(rs, state);
+ default:
+ throw new UnsupportedOperationException("Unknown remove type:
" + type);
+ }
+ }
+
+ private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S>
state)
+ {
+ int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+ IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
+ TopologyHistory.Node adding =
state.topologyHistory.replace(nodeToReplace);
+ TopologyHistory.Node removing =
state.topologyHistory.nodes.get(nodeToReplace);
+
+ return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + "
for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ ClusterUtils.stopUnchecked(toReplace);
+ removing.down();
+ }),
+ new SimpleCommand<>("Host Replace Node" +
nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ logger.info("node{} starting host replacement;
epoch={}", adding.id,
HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance()));
+ removing.status =
TopologyHistory.Node.Status.BeingReplaced;
+ IInvokableInstance inst =
ClusterUtils.replaceHostAndStart(s2.cluster, toReplace);
+ s2.topologyHistory.replaced(removing, adding);
+ long epoch = HackSerialization.tcmEpoch(inst);
+ s2.currentEpoch.set(epoch);
+ logger.info("{} completed host replacement in
epoch={}", inst, epoch);
+ }),
+ //TODO (remove after rebase to trunk):
https://issues.apache.org/jira/browse/CASSANDRA-19705 After the rebase to
trunk this is not needed. The issue is that the CMS placement removes the
node, it does not promote another node, this cases rf=3 to become rf=2
+ new SimpleCommand<>("CMS reconfigure on Node" +
adding.id + state.commandNamePostfix(), s2 ->
s2.cluster.get(adding.id).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF)).asserts().success())
+ );
+ }
+
+ protected abstract Gen<State<S>> stateGen();
+
+ protected void preCheck(Property.StatefulBuilder statefulBuilder)
+ {
+
+ }
+
+ protected void destroyState(State<S> state, @Nullable Throwable cause)
throws Throwable
+ {
+
+ }
+
+ @Test
+ public void test()
+ {
+ Property.StatefulBuilder statefulBuilder =
stateful().withSteps(50).withStepTimeout(Duration.ofMinutes(2)).withExamples(1);
+ preCheck(statefulBuilder);
+ statefulBuilder.check(new Commands<State<S>, Void>()
+ {
+ @Override
+ public Gen<State<S>> genInitialState()
+ {
+ return stateGen();
+ }
+
+ @Override
+ public Void createSut(State<S> state)
+ {
+ return null;
+ }
+
+ @Override
+ public Gen<Command<State<S>, Void, ?>> commands(State<S> state)
+ {
+ state.preActions.forEach(Runnable::run);
+
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+
+ // topology change
+ EnumSet<TopologyChange> possibleTopologyChanges =
possibleTopologyChanges(state);
+ if (!possibleTopologyChanges.isEmpty())
+ possible.put(topologyCommand(state,
possibleTopologyChanges), 2);
+ possible.put(rs -> repairCommand(state,
rs.pickInt(state.topologyHistory.up())), 1);
+ possible.put(rs -> state.statementGen.apply(rs, state), 7);
+
+ return Gens.oneOf(possible);
+ }
+
+ private EnumSet<TopologyChange> possibleTopologyChanges(State<S>
state)
+ {
+ EnumSet<TopologyChange> possibleTopologyChanges =
EnumSet.noneOf(TopologyChange.class);
+ // up or down is logically more correct, but since this runs
sequentially and after the topology changes are complete, we don't have downed
nodes at this point
+ // so up is enough to know the topology size
+ int size = state.topologyHistory.up().length;
+ if (size < state.topologyHistory.maxNodes)
+ possibleTopologyChanges.add(TopologyChange.AddNode);
+ if (size > state.topologyHistory.quorum())
+ {
+ if (size > TARGET_RF)
+ possibleTopologyChanges.add(TopologyChange.RemoveNode);
+ possibleTopologyChanges.add(TopologyChange.HostReplace);
+ }
+ return possibleTopologyChanges;
+ }
+
+ private Gen<Command<State<S>, Void, ?>> topologyCommand(State<S>
state, EnumSet<TopologyChange> possibleTopologyChanges)
+ {
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+ for (TopologyChange task : possibleTopologyChanges)
+ {
+ switch (task)
+ {
+ case AddNode:
+ possible.put(ignore -> multistep(addNode(state),
waitForCMSToQuiesce()), 1);
+ break;
+ case RemoveNode:
+ possible.put(rs ->
multistep(removeNodeRandomizedDispatch(rs, state), waitForCMSToQuiesce()), 1);
+ break;
+ case HostReplace:
+ possible.put(rs -> multistep(hostReplace(rs,
state), waitForCMSToQuiesce()), 1);
+ break;
+ default:
+ throw new
UnsupportedOperationException(task.name());
+ }
+ }
+ return Gens.oneOf(possible);
+ }
+
+ @Override
+ public void destroyState(State<S> state, @Nullable Throwable
cause) throws Throwable
+ {
+ if (cause != null)
+ {
+ for (Runnable r : state.onError)
+ {
+ try
+ {
+ r.run();
+ }
+ catch (Throwable t)
+ {
+ cause.addSuppressed(t);
+ }
+ }
+ }
+ TopologyMixupTestBase.this.destroyState(state, cause);
+ state.close();
+ }
+ });
+ }
+
+ private static IntHashSet asSet(int[] array)
+ {
+ IntHashSet set = new IntHashSet(array.length);
+ for (int i : array)
+ set.add(i);
+ return set;
+ }
+
+ public interface SchemaSpec
+ {
+ String name();
+
+ String keyspaceName();
+ }
+
+ protected static class State<S extends SchemaSpec> implements AutoCloseable
+ {
+ final TopologyHistory topologyHistory;
+ final Cluster cluster;
+ final S schemaSpec;
+ final List<Runnable> preActions = new CopyOnWriteArrayList<>();
+ final List<Runnable> onError = new CopyOnWriteArrayList<>();
+ final AtomicLong currentEpoch = new AtomicLong();
+ final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>>
statementGen;
+ final Gen<RemoveType> removeTypeGen;
+ private final Map<String, Object> yamlConfigOverrides;
+ int[] cmsGroup = new int[0];
+
+ public State(RandomSource rs, BiFunction<RandomSource, Cluster, S>
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>,
Void, ?>>> cqlOperationsGen)
+ {
+ this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+ try
+ {
+
+ this.yamlConfigOverrides = CONF_GEN.next(rs);
+ cluster = Cluster.build(topologyHistory.minNodes)
+ .withTokenSupplier(topologyHistory)
+ .withConfig(c -> {
+ c.with(Feature.values())
+ .set("write_request_timeout", "10s");
+ //TODO (mainatiance): where to put this?
Anything touching ConfigGenBuilder with jvm-dtest needs this...
+ ((InstanceConfig)
c).remove("commitlog_sync_period_in_ms");
+ for (Map.Entry<String, Object> e :
yamlConfigOverrides.entrySet())
+ c.set(e.getKey(), e.getValue());
+ onConfigure(c);
+ })
+ //TODO (maintance): should TopologyHistory
also be a INodeProvisionStrategy.Factory so address information is stored in
the Node?
+ //TODO (maintance): AbstractCluster's
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with
dc/rack annoying, if this becomes an interface then TopologyHistory could own
+ .withNodeProvisionStrategy((subnet, portMap)
-> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap)
+ {
+ {
+ Invariants.checkArgument(subnet == 0,
"Unexpected subnet detected: %d", subnet);
+ }
+
+ private final String ipPrefix = "127.0."
+ subnet + '.';
+
+ @Override
+ public int seedNodeNum()
+ {
+ int[] up = topologyHistory.up();
+ if (Arrays.equals(up, new int[]{ 1, 2
}))
+ return 1;
+ return rs.pickInt(up);
+ }
+
+ @Override
+ public String ipAddress(int nodeNum)
+ {
+ return ipPrefix + nodeNum;
+ }
+ })
+ .start();
+ }
+ catch (IOException e)
+ {
+ throw new UncheckedIOException(e);
+ }
+ fixDistributedSchemas(cluster);
+ init(cluster, TARGET_RF);
+ // fix TCM
+ {
+ NodeToolResult result = cluster.get(1).nodetoolResult("cms",
"reconfigure", "2");
+ result.asserts().success();
+ logger.info("CMS reconfigure: {}", result.getStdout());
+ }
+ preActions.add(new Runnable()
+ {
+ // in order to remove this action, an annonomus class is
needed so "this" works, lambda "this" is the parent class
+ @Override
+ public void run()
+ {
+ if (topologyHistory.up().length == TARGET_RF)
+ {
+ NodeToolResult result =
cluster.get(1).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF));
+ result.asserts().success();
+ logger.info("CMS reconfigure: {}", result.getStdout());
+ preActions.remove(this);
+ }
+ }
+ });
+ preActions.add(() -> {
+ int[] up = topologyHistory.up();
+ // use the most recent node just in case the cluster isn't
in-sync
+ IInvokableInstance node = cluster.get(up[up.length - 1]);
+ cmsGroup = HackSerialization.cmsGroup(node);
+ currentEpoch.set(HackSerialization.tcmEpoch(node));
+ });
+ preActions.add(() -> cluster.checkAndResetUncaughtExceptions());
+ this.schemaSpec = schemaSpecGen.apply(rs, cluster);
+ statementGen = cqlOperationsGen.apply(schemaSpec);
+
+ removeTypeGen = REMOVE_TYPE_DISTRIBUTION.next(rs);
+
+ long waitForEpoch = HackSerialization.tcmEpoch(cluster.get(1));
+ currentEpoch.set(waitForEpoch);
+ onStartupComplete(waitForEpoch);
+ }
+
+ protected void onStartupComplete(long tcmEpoch)
+ {
+
+ }
+
+ protected void onConfigure(IInstanceConfig config)
+ {
+
+ }
+
+ protected String commandNamePostfix()
+ {
+ return "; epoch=" + currentEpoch.get() + ", cms=" +
Arrays.toString(cmsGroup);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Yaml
Config:\n").append(YamlConfigurationLoader.toYaml(this.yamlConfigOverrides));
+ sb.append("\nTopology:\n").append(topologyHistory);
+ sb.append("\nCMS Voting Group:
").append(Arrays.toString(cmsGroup));
+ return sb.toString();
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ cluster.close();
+ }
+ }
+
+ public static class TopologyHistory implements TokenSupplier
+ {
+ private final RandomSource rs;
+ private final int tokensPerNode;
+ private final int minNodes, maxNodes;
+
+ private final Int2ObjectHashMap<Node> nodes = new
Int2ObjectHashMap<>();
+ private final Set<String> activeTokens = new HashSet<>();
+ private int uniqueInstances = 0;
+ /**
+ * Tracks how many topology change events were performed
+ */
+ private int generation = 0;
+
+ public TopologyHistory(RandomSource rs, int minNodes, int maxNodes)
+ {
+ this.rs = rs;
+ this.minNodes = minNodes;
+ this.maxNodes = maxNodes;
+ this.tokensPerNode = Cluster.build(1).getTokenCount();
+ for (int i = 0; i < minNodes; i++)
+ addNode();
+ for (Node n : nodes.values())
+ n.status = Node.Status.Up;
+ }
+
+ public long generation()
+ {
+ return generation;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i <= nodes.size(); i++)
+ {
+ Node node = nodes.get(i);
+ sb.append("\n\tNode").append(i).append(":
status=").append(node.status).append(", tokens=").append(node.tokens);
+ }
+ return sb.toString();
+ }
+
+ public int quorum()
+ {
+ return (TARGET_RF / 2) + 1;
+ }
+
+ @Override
+ public Collection<String> tokens(int i)
+ {
+ Node n = nodes.get(i);
+ if (n == null)
+ throw new IllegalArgumentException("Unknown node" + i);
+ return n.tokens;
+ }
+
+ public int[] active()
+ {
+ int[] keys = new int[nodes.size()];
Review Comment:
this one seems to be returning all nodes not just active, or?
##########
src/java/org/apache/cassandra/tcm/ClusterMetadataService.java:
##########
@@ -844,11 +844,22 @@ private Pair<State, Processor> delegateInternal()
@Override
public Commit.Result commit(Entry.Id entryId, Transformation
transform, Epoch lastKnown, Retry.Deadline retryPolicy)
{
- Pair<State, Processor> delegate = delegateInternal();
- Commit.Result result = delegate.right.commit(entryId, transform,
lastKnown, retryPolicy);
- if (delegate.left == LOCAL || delegate.left == RESET)
- replicator.send(result, null);
- return result;
+ while (!retryPolicy.reachedMax())
Review Comment:
I _think_ we should not retry here, and instead should retry in an
underlying processor. But maybe this is one of the Marcus's patches.
##########
test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java:
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.Commands;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.Property.StateOnlyCommand;
+import accord.utils.RandomSource;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.utils.ConfigGenBuilder;
+
+import static accord.utils.Property.multistep;
+import static accord.utils.Property.stateful;
+
+/**
+ * These tests can create many instances, so mac users may need to run the
following to avoid address bind failures
+ *
+ * {@code for id in $(seq 0 15); do sudo ifconfig lo0 alias "127.0.0.$id";
done;}
+ */
+public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.SchemaSpec> extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(TopologyMixupTestBase.class);
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ private enum TopologyChange
+ {
+ AddNode,
+ RemoveNode,
+ HostReplace,
+ //TODO (coverage): add the following states once supported
+// StopNode,
+// StartNode,
+// MoveToken
+ //TODO (coverage): node migrate to another rack or dc (unsupported on
trunk as of this writing, but planned work for TCM)
+// MoveNodeToNewRack,
+// MoveNodeToNewDC,
+ }
+
+ private enum RemoveType
+ {Decommission, RemoveNode, Assassinate}
+
+ private static final Gen.IntGen MURMUR_TOKEN_GEN = rs ->
rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE);
+ private static final int TARGET_RF = 3;
+ private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION =
Gens.enums().allMixedDistribution(RemoveType.class);
+ private static final Gen<Map<String, Object>> CONF_GEN = new
ConfigGenBuilder()
+ // jvm-dtest hard
codes this partitioner in its APIs, so overriding will break the test
+
.withPartitionerGen(null)
+ .build();
+
+ // common commands
+ private Command<State<S>, Void, ?> repairCommand(State<S> state, int
toCoordinate)
+ {
+ return new SimpleCommand<>("nodetool repair " +
state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node"
+ toCoordinate + state.commandNamePostfix(),
+ s2 ->
s2.cluster.get(toCoordinate).nodetoolResult("repair",
state.schemaSpec.keyspaceName(), s2.schemaSpec.name()).asserts().success());
+ }
+
+ private Command<State<S>, Void, ?> waitForCMSToQuiesce()
+ {
+ return new StateOnlyCommand<>()
+ {
+ @Override
+ public String detailed(State<S> state)
+ {
+ return "Waiting for CMS to Quiesce" +
state.commandNamePostfix();
+ }
+
+ @Override
+ public void applyUnit(State<S> state)
+ {
+ ClusterUtils.waitForCMSToQuiesce(state.cluster,
state.cmsGroup);
+ }
+ };
+ }
+
+ private Command<State<S>, Void, ?> stopInstance(State<S> state, int
toRemove)
+ {
+ return new SimpleCommand<>("Stop Node" + toRemove + " for Assassinate"
+ state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ ClusterUtils.stopUnchecked(inst);
+ node.down();
+ });
+ }
+
+ private Command<State<S>, Void, ?> addNode(State<S> state)
+ {
+ int nodeId = state.topologyHistory.uniqueInstances + 1;
+ return new SimpleCommand<>("Add Node" + nodeId +
state.commandNamePostfix(),
+ s2 -> {
+ TopologyHistory.Node n =
s2.topologyHistory.addNode();
+ IInvokableInstance newInstance =
ClusterUtils.addInstance(s2.cluster, n.dc, n.rack, c -> c.set("auto_bootstrap",
true));
+ newInstance.startup(s2.cluster);
+ n.up();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs,
State<S> state)
+ {
+ int toRemove = rs.pickInt(state.topologyHistory.up());
+ return new SimpleCommand<>("nodetool decommission node" + toRemove +
state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ node.status = TopologyHistory.Node.Status.BeingDecommissioned;
+ inst.nodetoolResult("decommission").asserts().success();
+ ClusterUtils.stopUnchecked(inst);
+ node.removed();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S>
state)
+ {
+ int[] up = state.topologyHistory.up();
+ int toRemove = rs.pickInt(up);
+ int toCoordinate;
+ {
+ int picked;
+ do
+ {
+ picked = rs.pickInt(up);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(state, toRemove),
+ new SimpleCommand<>("nodetool removenode node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingRemoved;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ coordinator.nodetoolResult("removenode",
Integer.toString(toRemove), "--force").asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(state, toCoordinate));
+ }
+
+ private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs,
State<S> state)
+ {
+ //TODO (correctness): assassinate CMS member isn't allowed
+ IntHashSet up = asSet(state.topologyHistory.up());
+ IntHashSet cmsGroup = asSet(state.cmsGroup);
+ Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
+ if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a
CMS member");
+ List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
+ allowed.sort(Comparator.naturalOrder());
+ int toRemove = rs.pick(allowed);
+ int toCoordinate;
+ {
+ int[] upInt = state.topologyHistory.up();
+ int picked;
+ do
+ {
+ picked = rs.pickInt(upInt);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(state, toRemove),
+ new SimpleCommand<>("nodetool assassinate node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingAssassinated;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ InetSocketAddress address =
s2.cluster.get(toRemove).config().broadcastAddress();
+ coordinator.nodetoolResult("assassinate",
address.getAddress().getHostAddress() + ":" +
address.getPort()).asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(state, toCoordinate)
+ );
+ }
+
+ private Command<State<S>, Void, ?>
removeNodeRandomizedDispatch(RandomSource rs, State<S> state)
+ {
+ RemoveType type = state.removeTypeGen.next(rs);
+ switch (type)
+ {
+ case Decommission:
+ return removeNodeDecommission(rs, state);
+ case RemoveNode:
+ return removeNode(rs, state);
+ case Assassinate:
+ return removeNodeAssassinate(rs, state);
+ default:
+ throw new UnsupportedOperationException("Unknown remove type:
" + type);
+ }
+ }
+
+ private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S>
state)
+ {
+ int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+ IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
+ TopologyHistory.Node adding =
state.topologyHistory.replace(nodeToReplace);
+ TopologyHistory.Node removing =
state.topologyHistory.nodes.get(nodeToReplace);
+
+ return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + "
for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ ClusterUtils.stopUnchecked(toReplace);
+ removing.down();
+ }),
+ new SimpleCommand<>("Host Replace Node" +
nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ logger.info("node{} starting host replacement;
epoch={}", adding.id,
HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance()));
+ removing.status =
TopologyHistory.Node.Status.BeingReplaced;
+ IInvokableInstance inst =
ClusterUtils.replaceHostAndStart(s2.cluster, toReplace);
+ s2.topologyHistory.replaced(removing, adding);
+ long epoch = HackSerialization.tcmEpoch(inst);
+ s2.currentEpoch.set(epoch);
+ logger.info("{} completed host replacement in
epoch={}", inst, epoch);
+ }),
+ //TODO (remove after rebase to trunk):
https://issues.apache.org/jira/browse/CASSANDRA-19705 After the rebase to
trunk this is not needed. The issue is that the CMS placement removes the
node, it does not promote another node, this cases rf=3 to become rf=2
+ new SimpleCommand<>("CMS reconfigure on Node" +
adding.id + state.commandNamePostfix(), s2 ->
s2.cluster.get(adding.id).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF)).asserts().success())
+ );
+ }
+
+ protected abstract Gen<State<S>> stateGen();
+
+ protected void preCheck(Property.StatefulBuilder statefulBuilder)
+ {
+
+ }
+
+ protected void destroyState(State<S> state, @Nullable Throwable cause)
throws Throwable
+ {
+
+ }
+
+ @Test
+ public void test()
+ {
+ Property.StatefulBuilder statefulBuilder =
stateful().withSteps(50).withStepTimeout(Duration.ofMinutes(2)).withExamples(1);
+ preCheck(statefulBuilder);
+ statefulBuilder.check(new Commands<State<S>, Void>()
+ {
+ @Override
+ public Gen<State<S>> genInitialState()
+ {
+ return stateGen();
+ }
+
+ @Override
+ public Void createSut(State<S> state)
+ {
+ return null;
+ }
+
+ @Override
+ public Gen<Command<State<S>, Void, ?>> commands(State<S> state)
+ {
+ state.preActions.forEach(Runnable::run);
+
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+
+ // topology change
+ EnumSet<TopologyChange> possibleTopologyChanges =
possibleTopologyChanges(state);
+ if (!possibleTopologyChanges.isEmpty())
+ possible.put(topologyCommand(state,
possibleTopologyChanges), 2);
+ possible.put(rs -> repairCommand(state,
rs.pickInt(state.topologyHistory.up())), 1);
+ possible.put(rs -> state.statementGen.apply(rs, state), 7);
+
+ return Gens.oneOf(possible);
+ }
+
+ private EnumSet<TopologyChange> possibleTopologyChanges(State<S>
state)
+ {
+ EnumSet<TopologyChange> possibleTopologyChanges =
EnumSet.noneOf(TopologyChange.class);
+ // up or down is logically more correct, but since this runs
sequentially and after the topology changes are complete, we don't have downed
nodes at this point
+ // so up is enough to know the topology size
+ int size = state.topologyHistory.up().length;
+ if (size < state.topologyHistory.maxNodes)
+ possibleTopologyChanges.add(TopologyChange.AddNode);
+ if (size > state.topologyHistory.quorum())
+ {
+ if (size > TARGET_RF)
+ possibleTopologyChanges.add(TopologyChange.RemoveNode);
+ possibleTopologyChanges.add(TopologyChange.HostReplace);
+ }
+ return possibleTopologyChanges;
+ }
+
+ private Gen<Command<State<S>, Void, ?>> topologyCommand(State<S>
state, EnumSet<TopologyChange> possibleTopologyChanges)
+ {
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+ for (TopologyChange task : possibleTopologyChanges)
+ {
+ switch (task)
+ {
+ case AddNode:
+ possible.put(ignore -> multistep(addNode(state),
waitForCMSToQuiesce()), 1);
+ break;
+ case RemoveNode:
+ possible.put(rs ->
multistep(removeNodeRandomizedDispatch(rs, state), waitForCMSToQuiesce()), 1);
+ break;
+ case HostReplace:
+ possible.put(rs -> multistep(hostReplace(rs,
state), waitForCMSToQuiesce()), 1);
+ break;
+ default:
+ throw new
UnsupportedOperationException(task.name());
+ }
+ }
+ return Gens.oneOf(possible);
+ }
+
+ @Override
+ public void destroyState(State<S> state, @Nullable Throwable
cause) throws Throwable
+ {
+ if (cause != null)
+ {
+ for (Runnable r : state.onError)
+ {
+ try
+ {
+ r.run();
+ }
+ catch (Throwable t)
+ {
+ cause.addSuppressed(t);
+ }
+ }
+ }
+ TopologyMixupTestBase.this.destroyState(state, cause);
+ state.close();
+ }
+ });
+ }
+
+ private static IntHashSet asSet(int[] array)
+ {
+ IntHashSet set = new IntHashSet(array.length);
+ for (int i : array)
+ set.add(i);
+ return set;
+ }
+
+ public interface SchemaSpec
+ {
+ String name();
+
+ String keyspaceName();
+ }
+
+ protected static class State<S extends SchemaSpec> implements AutoCloseable
+ {
+ final TopologyHistory topologyHistory;
+ final Cluster cluster;
+ final S schemaSpec;
+ final List<Runnable> preActions = new CopyOnWriteArrayList<>();
+ final List<Runnable> onError = new CopyOnWriteArrayList<>();
+ final AtomicLong currentEpoch = new AtomicLong();
+ final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>>
statementGen;
+ final Gen<RemoveType> removeTypeGen;
+ private final Map<String, Object> yamlConfigOverrides;
+ int[] cmsGroup = new int[0];
+
+ public State(RandomSource rs, BiFunction<RandomSource, Cluster, S>
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>,
Void, ?>>> cqlOperationsGen)
+ {
+ this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+ try
+ {
+
+ this.yamlConfigOverrides = CONF_GEN.next(rs);
+ cluster = Cluster.build(topologyHistory.minNodes)
+ .withTokenSupplier(topologyHistory)
+ .withConfig(c -> {
+ c.with(Feature.values())
+ .set("write_request_timeout", "10s");
+ //TODO (mainatiance): where to put this?
Anything touching ConfigGenBuilder with jvm-dtest needs this...
+ ((InstanceConfig)
c).remove("commitlog_sync_period_in_ms");
+ for (Map.Entry<String, Object> e :
yamlConfigOverrides.entrySet())
+ c.set(e.getKey(), e.getValue());
+ onConfigure(c);
+ })
+ //TODO (maintance): should TopologyHistory
also be a INodeProvisionStrategy.Factory so address information is stored in
the Node?
+ //TODO (maintance): AbstractCluster's
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with
dc/rack annoying, if this becomes an interface then TopologyHistory could own
+ .withNodeProvisionStrategy((subnet, portMap)
-> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap)
+ {
+ {
+ Invariants.checkArgument(subnet == 0,
"Unexpected subnet detected: %d", subnet);
+ }
+
+ private final String ipPrefix = "127.0."
+ subnet + '.';
+
+ @Override
+ public int seedNodeNum()
+ {
+ int[] up = topologyHistory.up();
+ if (Arrays.equals(up, new int[]{ 1, 2
}))
+ return 1;
+ return rs.pickInt(up);
+ }
+
+ @Override
+ public String ipAddress(int nodeNum)
+ {
+ return ipPrefix + nodeNum;
+ }
+ })
+ .start();
+ }
+ catch (IOException e)
+ {
+ throw new UncheckedIOException(e);
+ }
+ fixDistributedSchemas(cluster);
+ init(cluster, TARGET_RF);
+ // fix TCM
+ {
+ NodeToolResult result = cluster.get(1).nodetoolResult("cms",
"reconfigure", "2");
+ result.asserts().success();
+ logger.info("CMS reconfigure: {}", result.getStdout());
+ }
+ preActions.add(new Runnable()
+ {
+ // in order to remove this action, an annonomus class is
needed so "this" works, lambda "this" is the parent class
Review Comment:
Also, if you like you can use `preActions.remove(State.this);` and lambda. I
don't mind an anonymous class though
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]