dcapwell commented on code in PR #3416: URL: https://github.com/apache/cassandra/pull/3416#discussion_r1680121836
########## test/distributed/org/apache/cassandra/distributed/test/topology/TopologyMixupTest.java: ########## @@ -0,0 +1,856 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.topology; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.annotation.Nullable; + +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.coordinate.Exhausted; +import accord.local.Node; +import accord.primitives.Ranges; +import accord.primitives.TxnId; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.Invariants; +import accord.utils.Property.Command; +import accord.utils.Property.Commands; +import accord.utils.Property.SimpleCommand; +import accord.utils.RandomSource; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.IntArrayList; +import org.agrona.collections.IntHashSet; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ast.Mutation; +import org.apache.cassandra.cql3.ast.Select; +import org.apache.cassandra.cql3.ast.Statement; +import org.apache.cassandra.cql3.ast.Txn; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.test.accord.AccordTestBase; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.api.AccordAgent; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.utils.CassandraGenerators; +import org.apache.cassandra.utils.Isolated; +import org.apache.cassandra.utils.Shared; +import org.awaitility.Awaitility; + +import static accord.utils.Property.multistep; +import static accord.utils.Property.stateful; +import static org.apache.cassandra.utils.AccordGenerators.fromQT; + +public class TopologyMixupTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(TopologyMixupTest.class); + + static + { + DatabaseDescriptor.clientInitialization(); + + CassandraRelevantProperties.ACCORD_AGENT_CLASS.setString(InterceptAgent.class.getName()); + } + + private static final Gen.IntGen MURMUR_TOKEN_GEN = rs -> rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE); + private static final Gen<SSTableFormat<?, ?>> SSTABLE_FORMAT_GEN = fromQT(CassandraGenerators.sstableFormat()); + private static final int TARGET_RF = 3; + private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION = Gens.enums().allMixedDistribution(RemoveType.class); + + private static class State implements SharedState.Listener + { + final TopologyHistory th; + final Cluster cluster; + final TableMetadata metadata; + final List<Runnable> preActions = new CopyOnWriteArrayList<>(); + final List<Runnable> onError = new CopyOnWriteArrayList<>(); + final AtomicLong currentEpoch = new AtomicLong(); + final Gen<Statement> statementGen; + final Gen<RemoveType> removeTypeGen; + int[] cmsGroup = new int[0]; + + private State(RandomSource rs) + { + this.th = new TopologyHistory(rs.fork(), 2, 4); + try + { + cluster = Cluster.build(th.minNodes) + .withTokenSupplier(th) + .withConfig(c -> c.with(Feature.values()) + .set("accord.shard_count", 1) + .set("paxos_variant", Config.PaxosVariant.v2.name()) + .set("write_request_timeout", "10s")) + //TODO (maintance): should TopologyHistory also be a INodeProvisionStrategy.Factory so address information is stored in the Node? + //TODO (maintance): AbstractCluster's Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with dc/rack annoying, if this becomes an interface then TopologyHistory could own + .withNodeProvisionStrategy((subnet, portMap) -> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap) + { + { + Invariants.checkArgument(subnet == 0, "Unexpected subnet detected: %d", subnet); + } + + private final String ipPrefix = "127.0." + subnet + '.'; + + @Override + public int seedNodeNum() + { + int[] up = th.up(); + if (Arrays.equals(up, new int[]{1, 2})) + return 1; + return rs.pickInt(up); + } + + @Override + public String ipAddress(int nodeNum) + { + return ipPrefix + nodeNum; + } + }) + .start(); + } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + DatabaseDescriptor.setSelectedSSTableFormat(SSTABLE_FORMAT_GEN.next(rs)); + fixDistributedSchemas(cluster); + init(cluster, TARGET_RF); + // fix TCM + { + NodeToolResult result = cluster.get(1).nodetoolResult("cms", "reconfigure", "2"); + result.asserts().success(); + logger.info("CMS reconfigure: {}", result.getStdout()); + } + preActions.add(new Runnable() + { + // in order to remove this action, an annonomus class is needed so "this" works, lambda "this" is the parent class + @Override + public void run() + { + if (th.up().length == TARGET_RF) + { + NodeToolResult result = cluster.get(1).nodetoolResult("cms", "reconfigure", Integer.toString(TARGET_RF)); + result.asserts().success(); + logger.info("CMS reconfigure: {}", result.getStdout()); + preActions.remove(this); + } + } + }); + preActions.add(() -> { + int[] up = th.up(); + // use the most recent node just in case the cluster isn't in-sync + IInvokableInstance node = cluster.get(up[up.length - 1]); + cmsGroup = HackSerialization.cmsGroup(node); + currentEpoch.set(HackSerialization.tcmEpoch(node)); + }); + List<TransactionalMode> modes = Stream.of(TransactionalMode.values()).filter(t -> t.accordIsEnabled).collect(Collectors.toList()); + TransactionalMode mode = rs.pick(modes); + boolean enableMigration = allowsMigration(mode) && rs.nextBoolean(); + metadata = fromQT(new CassandraGenerators.TableMetadataBuilder() + .withKeyspaceName(KEYSPACE) + .withTableKinds(TableMetadata.Kind.REGULAR) + .withKnownMemtables() + //TODO (coverage): include "fast_path = 'keyspace'" override + .withTransactionalMode(enableMigration ? TransactionalMode.off : mode) + .withoutEmpty() + .build()) + .next(rs); + maybeCreateUDTs(cluster, metadata); + String schemaCQL = metadata.toCqlString(false, false); + logger.info("Creating test table:\n{}", schemaCQL); + cluster.schemaChange(schemaCQL); + if (enableMigration) + { + cluster.schemaChange("ALTER TABLE " + metadata + " WITH " + mode.asCqlParam()); + cluster.get(1).nodetoolResult("consensus_admin", "begin-migration", "--target-protocol", "accord", metadata.keyspace, metadata.name).asserts().success(); + } + + //TODO (usability): schemaChange does not mean the cluster KNOWS about the table, so replicas may be stale by the time we query + // Accord also learns about it async, so every node may know the epoch, but accord doesn't yet! + long waitForEpoch = HackSerialization.tcmEpoch(cluster.get(1)); + for (int i = 1; i <= cluster.size(); i++) + { + int finalI = i; + Awaitility.await() + .atMost(Duration.ofMinutes(1)) + .until(() -> HackSerialization.accordEpoch(cluster.get(finalI)) == waitForEpoch); + } + + currentEpoch.set(waitForEpoch); + + Gen<Statement> select = (Gen<Statement>) (Gen<?>) fromQT(new Select.GenBuilder(metadata).withLimit1().build()); Review Comment: We talked in Slack, it's mostly the lack of coverage. Right now `AbstractTypeGenerator` supports every single type (even super weird ones we hope users never touch, but are not blocked...); so we benefit from border coverage. The other thing is that the new `ast` was built with Accord in mind (I wrote it to help validate Caleb's first CQL integration path), so it coverts more than harry does. We spoke in slack today about migrating Harry to use the ASTs rather than `StringBuilder` which will make it much easier for harry to do those things as well (examples missing: type hints, `+=` (super common with CAS and txn), etc.). Now, one thing I did have in mind when writing this test was to make the Accord part pluggable... I just never got around to doing that... My thinking was that the schema + statements are accord specific but the rest wasn't, so we could make this test abstract and have one for non-accord and one for accord... the non-accord one would be a good place to use Harry. Might even be good to have 4 versions of the test: 2 that validate (using harry) and 2 just fuzzing to see what breaks... Now, we both agree that long term we should converge, which makes these differences disappear... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

