dcapwell commented on code in PR #3416: URL: https://github.com/apache/cassandra/pull/3416#discussion_r1683580690
########## test/distributed/org/apache/cassandra/distributed/test/topology/TopologyMixupTest.java: ########## @@ -0,0 +1,856 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.topology; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.annotation.Nullable; + +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.coordinate.Exhausted; +import accord.local.Node; +import accord.primitives.Ranges; +import accord.primitives.TxnId; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.Invariants; +import accord.utils.Property.Command; +import accord.utils.Property.Commands; +import accord.utils.Property.SimpleCommand; +import accord.utils.RandomSource; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.IntArrayList; +import org.agrona.collections.IntHashSet; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ast.Mutation; +import org.apache.cassandra.cql3.ast.Select; +import org.apache.cassandra.cql3.ast.Statement; +import org.apache.cassandra.cql3.ast.Txn; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.test.accord.AccordTestBase; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.api.AccordAgent; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.utils.CassandraGenerators; +import org.apache.cassandra.utils.Isolated; +import org.apache.cassandra.utils.Shared; +import org.awaitility.Awaitility; + +import static accord.utils.Property.multistep; +import static accord.utils.Property.stateful; +import static org.apache.cassandra.utils.AccordGenerators.fromQT; + +public class TopologyMixupTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(TopologyMixupTest.class); + + static + { + DatabaseDescriptor.clientInitialization(); + + CassandraRelevantProperties.ACCORD_AGENT_CLASS.setString(InterceptAgent.class.getName()); + } + + private static final Gen.IntGen MURMUR_TOKEN_GEN = rs -> rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE); + private static final Gen<SSTableFormat<?, ?>> SSTABLE_FORMAT_GEN = fromQT(CassandraGenerators.sstableFormat()); + private static final int TARGET_RF = 3; + private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION = Gens.enums().allMixedDistribution(RemoveType.class); + + private static class State implements SharedState.Listener + { + final TopologyHistory th; + final Cluster cluster; + final TableMetadata metadata; + final List<Runnable> preActions = new CopyOnWriteArrayList<>(); + final List<Runnable> onError = new CopyOnWriteArrayList<>(); + final AtomicLong currentEpoch = new AtomicLong(); + final Gen<Statement> statementGen; + final Gen<RemoveType> removeTypeGen; + int[] cmsGroup = new int[0]; + + private State(RandomSource rs) + { + this.th = new TopologyHistory(rs.fork(), 2, 4); + try + { + cluster = Cluster.build(th.minNodes) + .withTokenSupplier(th) + .withConfig(c -> c.with(Feature.values()) + .set("accord.shard_count", 1) + .set("paxos_variant", Config.PaxosVariant.v2.name()) + .set("write_request_timeout", "10s")) + //TODO (maintance): should TopologyHistory also be a INodeProvisionStrategy.Factory so address information is stored in the Node? + //TODO (maintance): AbstractCluster's Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with dc/rack annoying, if this becomes an interface then TopologyHistory could own + .withNodeProvisionStrategy((subnet, portMap) -> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap) + { + { + Invariants.checkArgument(subnet == 0, "Unexpected subnet detected: %d", subnet); + } + + private final String ipPrefix = "127.0." + subnet + '.'; + + @Override + public int seedNodeNum() + { + int[] up = th.up(); + if (Arrays.equals(up, new int[]{1, 2})) + return 1; + return rs.pickInt(up); + } + + @Override + public String ipAddress(int nodeNum) + { + return ipPrefix + nodeNum; + } + }) + .start(); + } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + DatabaseDescriptor.setSelectedSSTableFormat(SSTABLE_FORMAT_GEN.next(rs)); + fixDistributedSchemas(cluster); + init(cluster, TARGET_RF); + // fix TCM + { + NodeToolResult result = cluster.get(1).nodetoolResult("cms", "reconfigure", "2"); + result.asserts().success(); + logger.info("CMS reconfigure: {}", result.getStdout()); + } + preActions.add(new Runnable() + { + // in order to remove this action, an annonomus class is needed so "this" works, lambda "this" is the parent class + @Override + public void run() + { + if (th.up().length == TARGET_RF) + { + NodeToolResult result = cluster.get(1).nodetoolResult("cms", "reconfigure", Integer.toString(TARGET_RF)); + result.asserts().success(); + logger.info("CMS reconfigure: {}", result.getStdout()); + preActions.remove(this); + } + } + }); + preActions.add(() -> { + int[] up = th.up(); + // use the most recent node just in case the cluster isn't in-sync + IInvokableInstance node = cluster.get(up[up.length - 1]); + cmsGroup = HackSerialization.cmsGroup(node); + currentEpoch.set(HackSerialization.tcmEpoch(node)); + }); + List<TransactionalMode> modes = Stream.of(TransactionalMode.values()).filter(t -> t.accordIsEnabled).collect(Collectors.toList()); + TransactionalMode mode = rs.pick(modes); + boolean enableMigration = allowsMigration(mode) && rs.nextBoolean(); + metadata = fromQT(new CassandraGenerators.TableMetadataBuilder() + .withKeyspaceName(KEYSPACE) + .withTableKinds(TableMetadata.Kind.REGULAR) + .withKnownMemtables() + //TODO (coverage): include "fast_path = 'keyspace'" override + .withTransactionalMode(enableMigration ? TransactionalMode.off : mode) + .withoutEmpty() + .build()) + .next(rs); + maybeCreateUDTs(cluster, metadata); + String schemaCQL = metadata.toCqlString(false, false); + logger.info("Creating test table:\n{}", schemaCQL); + cluster.schemaChange(schemaCQL); + if (enableMigration) + { + cluster.schemaChange("ALTER TABLE " + metadata + " WITH " + mode.asCqlParam()); + cluster.get(1).nodetoolResult("consensus_admin", "begin-migration", "--target-protocol", "accord", metadata.keyspace, metadata.name).asserts().success(); + } + + //TODO (usability): schemaChange does not mean the cluster KNOWS about the table, so replicas may be stale by the time we query + // Accord also learns about it async, so every node may know the epoch, but accord doesn't yet! + long waitForEpoch = HackSerialization.tcmEpoch(cluster.get(1)); + for (int i = 1; i <= cluster.size(); i++) + { + int finalI = i; + Awaitility.await() + .atMost(Duration.ofMinutes(1)) + .until(() -> HackSerialization.accordEpoch(cluster.get(finalI)) == waitForEpoch); + } + + currentEpoch.set(waitForEpoch); + + Gen<Statement> select = (Gen<Statement>) (Gen<?>) fromQT(new Select.GenBuilder(metadata).withLimit1().build()); Review Comment: updating more for history than to inform Alex... Alex and I spoke and worked to get Harry working with Accord... `TopologyMixupTest` got split into 2 tests: with accord / without accord... the one without accord uses harry for the validation and schema creation. The accord one still uses this logic as it helps validate the custom CQL that exists for Accord -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

