ifesdjeen commented on code in PR #3416: URL: https://github.com/apache/cassandra/pull/3416#discussion_r1679826119
########## test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.utils.Gen; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.utils.CassandraGenerators; +import org.apache.cassandra.utils.CassandraGenerators.TableMetadataBuilder; + +import static accord.utils.Property.qt; +import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked; +import static org.apache.cassandra.utils.AccordGenerators.fromQT; +import static org.apache.cassandra.utils.CassandraGenerators.insertCqlAllColumns; + +public class AccordHostReplacementTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(AccordHostReplacementTest.class); + + @Test + public void hostReplace() throws IOException + { + // start 2 node cluster and bootstrap 3rd node + // do a host replacement of one of the nodes + Cluster.Builder clusterBuilder = Cluster.build(3) + .withConfig(c -> c.with(Feature.values()) + .set("accord.shard_count", "1")); + TokenSupplier tokenRing = TokenSupplier.evenlyDistributedTokens(3, clusterBuilder.getTokenCount()); + int nodeToReplace = 2; + clusterBuilder = clusterBuilder.withTokenSupplier((TokenSupplier) node -> tokenRing.tokens(node == 4 ? nodeToReplace : node)); + try (Cluster cluster = clusterBuilder.start()) + { + fixDistributedSchemas(cluster); + init(cluster); + + qt().withSeed(5215087322357346205L).withExamples(1).check(rs -> { + TableMetadata metadata = fromQT(new TableMetadataBuilder() + .withKeyspaceName(KEYSPACE) + .withTableKinds(TableMetadata.Kind.REGULAR) + .withKnownMemtables() + //TODO (coverage): include "fast_path = 'keyspace'" override + .withTransactionalMode(TransactionalMode.full) + .build()) + .next(rs); + maybeCreateUDTs(cluster, metadata); + cluster.schemaChange(metadata.toCqlString(false, false)); + + Gen<ByteBuffer[]> dataGen = fromQT(fromQT(new CassandraGenerators.DataGeneratorBuilder(metadata).build(ignore -> 10)).next(rs)); + String insertStmt = wrapInTxn(insertCqlAllColumns(metadata)); + + for (int i = 0; i < 10; i++) Review Comment: As long as the data is written sequentially you can also reuse Harry machinery from `ConsistentBootstrapTest`, it is super simple: ``` ReplayingHistoryBuilder harry = HarryHelper.dataGen(new InJvmSut(cluster), new TokenPlacementModel.SimpleReplicationFactor(3), SystemUnderTest.ConsistencyLevel.ALL); Runnable writeAndValidate = () -> { System.out.println("Starting write phase..."); for (int i = 0; i < WRITES; i++) harry.insert(); System.out.println("Starting validate phase..."); harry.validateAll(harry.quiescentLocalChecker()); }; ``` ########## test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.utils.Gen; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.utils.CassandraGenerators; +import org.apache.cassandra.utils.CassandraGenerators.TableMetadataBuilder; + +import static accord.utils.Property.qt; +import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked; +import static org.apache.cassandra.utils.AccordGenerators.fromQT; +import static org.apache.cassandra.utils.CassandraGenerators.insertCqlAllColumns; + +public class AccordHostReplacementTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(AccordHostReplacementTest.class); + + @Test + public void hostReplace() throws IOException + { + // start 2 node cluster and bootstrap 3rd node + // do a host replacement of one of the nodes + Cluster.Builder clusterBuilder = Cluster.build(3) + .withConfig(c -> c.with(Feature.values()) + .set("accord.shard_count", "1")); + TokenSupplier tokenRing = TokenSupplier.evenlyDistributedTokens(3, clusterBuilder.getTokenCount()); + int nodeToReplace = 2; + clusterBuilder = clusterBuilder.withTokenSupplier((TokenSupplier) node -> tokenRing.tokens(node == 4 ? nodeToReplace : node)); + try (Cluster cluster = clusterBuilder.start()) + { + fixDistributedSchemas(cluster); + init(cluster); + + qt().withSeed(5215087322357346205L).withExamples(1).check(rs -> { + TableMetadata metadata = fromQT(new TableMetadataBuilder() + .withKeyspaceName(KEYSPACE) + .withTableKinds(TableMetadata.Kind.REGULAR) + .withKnownMemtables() + //TODO (coverage): include "fast_path = 'keyspace'" override + .withTransactionalMode(TransactionalMode.full) + .build()) + .next(rs); + maybeCreateUDTs(cluster, metadata); + cluster.schemaChange(metadata.toCqlString(false, false)); + + Gen<ByteBuffer[]> dataGen = fromQT(fromQT(new CassandraGenerators.DataGeneratorBuilder(metadata).build(ignore -> 10)).next(rs)); + String insertStmt = wrapInTxn(insertCqlAllColumns(metadata)); + + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute(insertStmt, ConsistencyLevel.ANY, (Object[]) dataGen.next(rs)); + + cluster.forEach(i -> i.nodetoolResult("repair", "--full", "--accord-only", KEYSPACE).asserts().success()); + + stopUnchecked(cluster.get(nodeToReplace)); + ClusterUtils.replaceHostAndStart(cluster, cluster.get(nodeToReplace)); + + cluster.forEach(i -> { + if (i.isShutdown()) return; + i.nodetoolResult("repair", "--full", "--accord-only", KEYSPACE).asserts().success(); Review Comment: I personally would remove it, and switch to writing with `ALL` above, at least if the intention is to test bootstrap streaming. ########## test/distributed/org/apache/cassandra/distributed/test/topology/TopologyMixupTest.java: ########## @@ -0,0 +1,856 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.topology; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.annotation.Nullable; + +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.coordinate.Exhausted; +import accord.local.Node; +import accord.primitives.Ranges; +import accord.primitives.TxnId; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.Invariants; +import accord.utils.Property.Command; +import accord.utils.Property.Commands; +import accord.utils.Property.SimpleCommand; +import accord.utils.RandomSource; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.IntArrayList; +import org.agrona.collections.IntHashSet; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ast.Mutation; +import org.apache.cassandra.cql3.ast.Select; +import org.apache.cassandra.cql3.ast.Statement; +import org.apache.cassandra.cql3.ast.Txn; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.test.accord.AccordTestBase; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.api.AccordAgent; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.utils.CassandraGenerators; +import org.apache.cassandra.utils.Isolated; +import org.apache.cassandra.utils.Shared; +import org.awaitility.Awaitility; + +import static accord.utils.Property.multistep; +import static accord.utils.Property.stateful; +import static org.apache.cassandra.utils.AccordGenerators.fromQT; + +public class TopologyMixupTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(TopologyMixupTest.class); + + static + { + DatabaseDescriptor.clientInitialization(); + + CassandraRelevantProperties.ACCORD_AGENT_CLASS.setString(InterceptAgent.class.getName()); + } + + private static final Gen.IntGen MURMUR_TOKEN_GEN = rs -> rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE); + private static final Gen<SSTableFormat<?, ?>> SSTABLE_FORMAT_GEN = fromQT(CassandraGenerators.sstableFormat()); + private static final int TARGET_RF = 3; + private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION = Gens.enums().allMixedDistribution(RemoveType.class); + + private static class State implements SharedState.Listener + { + final TopologyHistory th; + final Cluster cluster; + final TableMetadata metadata; + final List<Runnable> preActions = new CopyOnWriteArrayList<>(); + final List<Runnable> onError = new CopyOnWriteArrayList<>(); + final AtomicLong currentEpoch = new AtomicLong(); + final Gen<Statement> statementGen; + final Gen<RemoveType> removeTypeGen; + int[] cmsGroup = new int[0]; + + private State(RandomSource rs) + { + this.th = new TopologyHistory(rs.fork(), 2, 4); + try + { + cluster = Cluster.build(th.minNodes) + .withTokenSupplier(th) + .withConfig(c -> c.with(Feature.values()) + .set("accord.shard_count", 1) + .set("paxos_variant", Config.PaxosVariant.v2.name()) + .set("write_request_timeout", "10s")) + //TODO (maintance): should TopologyHistory also be a INodeProvisionStrategy.Factory so address information is stored in the Node? + //TODO (maintance): AbstractCluster's Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with dc/rack annoying, if this becomes an interface then TopologyHistory could own + .withNodeProvisionStrategy((subnet, portMap) -> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap) + { + { + Invariants.checkArgument(subnet == 0, "Unexpected subnet detected: %d", subnet); + } + + private final String ipPrefix = "127.0." + subnet + '.'; + + @Override + public int seedNodeNum() + { + int[] up = th.up(); + if (Arrays.equals(up, new int[]{1, 2})) + return 1; + return rs.pickInt(up); + } + + @Override + public String ipAddress(int nodeNum) + { + return ipPrefix + nodeNum; + } + }) + .start(); + } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + DatabaseDescriptor.setSelectedSSTableFormat(SSTABLE_FORMAT_GEN.next(rs)); + fixDistributedSchemas(cluster); + init(cluster, TARGET_RF); + // fix TCM + { + NodeToolResult result = cluster.get(1).nodetoolResult("cms", "reconfigure", "2"); + result.asserts().success(); + logger.info("CMS reconfigure: {}", result.getStdout()); + } + preActions.add(new Runnable() + { + // in order to remove this action, an annonomus class is needed so "this" works, lambda "this" is the parent class + @Override + public void run() + { + if (th.up().length == TARGET_RF) + { + NodeToolResult result = cluster.get(1).nodetoolResult("cms", "reconfigure", Integer.toString(TARGET_RF)); + result.asserts().success(); + logger.info("CMS reconfigure: {}", result.getStdout()); + preActions.remove(this); + } + } + }); + preActions.add(() -> { + int[] up = th.up(); + // use the most recent node just in case the cluster isn't in-sync + IInvokableInstance node = cluster.get(up[up.length - 1]); + cmsGroup = HackSerialization.cmsGroup(node); + currentEpoch.set(HackSerialization.tcmEpoch(node)); + }); + List<TransactionalMode> modes = Stream.of(TransactionalMode.values()).filter(t -> t.accordIsEnabled).collect(Collectors.toList()); + TransactionalMode mode = rs.pick(modes); + boolean enableMigration = allowsMigration(mode) && rs.nextBoolean(); + metadata = fromQT(new CassandraGenerators.TableMetadataBuilder() + .withKeyspaceName(KEYSPACE) + .withTableKinds(TableMetadata.Kind.REGULAR) + .withKnownMemtables() + //TODO (coverage): include "fast_path = 'keyspace'" override + .withTransactionalMode(enableMigration ? TransactionalMode.off : mode) + .withoutEmpty() + .build()) + .next(rs); + maybeCreateUDTs(cluster, metadata); + String schemaCQL = metadata.toCqlString(false, false); + logger.info("Creating test table:\n{}", schemaCQL); + cluster.schemaChange(schemaCQL); + if (enableMigration) + { + cluster.schemaChange("ALTER TABLE " + metadata + " WITH " + mode.asCqlParam()); + cluster.get(1).nodetoolResult("consensus_admin", "begin-migration", "--target-protocol", "accord", metadata.keyspace, metadata.name).asserts().success(); + } + + //TODO (usability): schemaChange does not mean the cluster KNOWS about the table, so replicas may be stale by the time we query + // Accord also learns about it async, so every node may know the epoch, but accord doesn't yet! + long waitForEpoch = HackSerialization.tcmEpoch(cluster.get(1)); + for (int i = 1; i <= cluster.size(); i++) + { + int finalI = i; + Awaitility.await() + .atMost(Duration.ofMinutes(1)) + .until(() -> HackSerialization.accordEpoch(cluster.get(finalI)) == waitForEpoch); + } + + currentEpoch.set(waitForEpoch); + + Gen<Statement> select = (Gen<Statement>) (Gen<?>) fromQT(new Select.GenBuilder(metadata).withLimit1().build()); Review Comment: Since we are not testing anything Accord-specific here, and regular mutation statements would do, any reason not to use Harry here just like the rest of `Consistent*Test` do? It validates the placements after streaming by checking data presence even locally, so it has some big advantages. ########## test/distributed/org/apache/cassandra/distributed/test/topology/TopologyMixupTest.java: ########## @@ -0,0 +1,856 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.topology; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.annotation.Nullable; + +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.coordinate.Exhausted; +import accord.local.Node; +import accord.primitives.Ranges; +import accord.primitives.TxnId; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.Invariants; +import accord.utils.Property.Command; +import accord.utils.Property.Commands; +import accord.utils.Property.SimpleCommand; +import accord.utils.RandomSource; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.IntArrayList; +import org.agrona.collections.IntHashSet; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ast.Mutation; +import org.apache.cassandra.cql3.ast.Select; +import org.apache.cassandra.cql3.ast.Statement; +import org.apache.cassandra.cql3.ast.Txn; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.test.accord.AccordTestBase; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.api.AccordAgent; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.utils.CassandraGenerators; +import org.apache.cassandra.utils.Isolated; +import org.apache.cassandra.utils.Shared; +import org.awaitility.Awaitility; + +import static accord.utils.Property.multistep; +import static accord.utils.Property.stateful; +import static org.apache.cassandra.utils.AccordGenerators.fromQT; + +public class TopologyMixupTest extends TestBaseImpl Review Comment: I think this test brings a lot of value, but what I am not seeing is checking if streaming was successful / the data is present on the target nodes. We have a lot of machinery for testing these things in TCM, which can be reused. Also, we either have to simplify this test a bit and reduce the number of things it does, or refactor it a bit and make it a bit simpler to get overview. Some middle-ground would be to open a Jira for creating a general framework for driving state changes and integrating it with the rest of our fuzz tooling, that would deduplicate efforts. We have done a lot of work to consolidate things in TCM, so it would be great to reuse some of that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

