aweisberg commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915371970


##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java:
##########
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.primitives.Ranges;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.Config.PaxosVariant;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageSink;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import 
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.RepairResult;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusTableMigration;
+import 
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.Throwables.ThrowingRunnable;
+import org.assertj.core.api.Assertions;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.lang.String.format;
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static 
org.apache.cassandra.dht.Murmur3Partitioner.LongToken.keyForToken;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getNextEpoch;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeEnacting;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.unpauseEnactment;
+import static 
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator.gt;
+import static 
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator.gte;
+import static 
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator.lt;
+import static 
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator.lte;
+import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
+import static org.junit.Assert.assertEquals;
+
+/*
+ * Test that non-transactional read operations migrating to/from a mode where 
Accord ignores commit consistency levels
+ * and does aysnc commit are routed correctly. Currently this is just 
TransactionalMode.full
+ */
+public abstract class AccordMigrationReadRaceTestBase extends AccordTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordMigrationReadRaceTestBase.class);
+    private static final int TEST_BOUNDS_CONCURRENCY = 32;
+    // Set BATCH_INDEX to the failing batch and this to true to find out the 
query index, then set QUERY_INDEX
+    private static final boolean EXECUTE_BATCH_QUERIES_SERIALLY = false;
+    // Specify only a single batch or query to run
+    private static final Integer BATCH_INDEX = null;
+    private static final Integer QUERY_INDEX = null;
+    private static final String TABLE_FMT = "CREATE TABLE %s (pk blob, c int, 
v int, PRIMARY KEY ((pk), c));";
+
+    private static IPartitioner partitioner;
+
+    private static Range<Token> migratingRange;
+
+    private static ICoordinator coordinator;
+
+    private final static TestMessageSink messageSink = new TestMessageSink();
+    private static class TestMessageSink implements IMessageSink
+    {
+        private final Queue<Pair<InetSocketAddress,IMessage>> messages = new 
ConcurrentLinkedQueue<>();
+        private final Set<InetSocketAddress> blackholed = new 
ConcurrentHashSet<>();
+
+        public void reset()
+        {
+            messages.clear();
+            blackholed.clear();
+        }
+
+        @Override
+        public void accept(InetSocketAddress to, IMessage message) {
+            messages.offer(Pair.create(to,message));
+            IInstance i = SHARED_CLUSTER.get(to);
+            if (blackholed.contains(to) || blackholed.contains(message.from()))
+                return;
+            if (i != null)
+                i.receiveMessage(message);
+        }
+    }
+
+    private final boolean migrateAwayFromAccord;
+
+    protected AccordMigrationReadRaceTestBase()
+    {
+        this.migrateAwayFromAccord = migratingAwayFromAccord();
+    }
+
+    protected abstract boolean migratingAwayFromAccord();
+
+    @Override
+    protected Logger logger()
+    {
+        return logger;
+    }
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        ServerTestUtils.daemonInitialization();
+        // Otherwise repair complains if you don't specify a keyspace
+        CassandraRelevantProperties.SYSTEM_TRACES_DEFAULT_RF.setInt(3);
+        AccordTestBase.setupCluster(builder -> builder.appendConfig(config -> 
config.set("paxos_variant", PaxosVariant.v2.name())
+                                                                               
     .set("read_request_timeout", "3600s")

Review Comment:
   Reduce timeouts to something more reasonable, this was just to allow 
debugging during development



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to