beobal commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1916520454
##########
src/java/org/apache/cassandra/tcm/log/LocalLog.java:
##########
@@ -730,12 +739,11 @@ public void runOnce(DurationSpec duration) throws
TimeoutException
// complete.
if (current != null)
{
- if (duration == null)
+ if (timeout < 0)
{
-
current.awaitThrowUncheckedOnInterrupt();
}
- else if
(!current.awaitThrowUncheckedOnInterrupt(duration.to(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS))
+ else if (!current.awaitThrowUncheckedOnInterrupt(timeout,
TimeUnit.MILLISECONDS))
Review Comment:
here's a counter argument against switching away from `DurationSpec`, easier
to introduce bugs like this. The supplied `TimeUnit` is being ignored and I
think in almost every root call site it's actually a nanosecond value that
being supplied.
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java:
##########
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.primitives.Ranges;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.Config.PaxosVariant;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageSink;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.RepairResult;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult;
+import
org.apache.cassandra.service.consensus.migration.ConsensusTableMigration;
+import
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.Throwables.ThrowingRunnable;
+import org.assertj.core.api.Assertions;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.lang.String.format;
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static
org.apache.cassandra.dht.Murmur3Partitioner.LongToken.keyForToken;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.getNextEpoch;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeEnacting;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.unpauseEnactment;
+import static
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator.gt;
+import static
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator.gte;
+import static
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator.lt;
+import static
org.apache.cassandra.distributed.test.accord.InteropTokenRangeTest.TokenOperator.lte;
+import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
+import static org.junit.Assert.assertEquals;
+
+/*
+ * Test that non-transactional read operations migrating to/from a mode where
Accord ignores commit consistency levels
+ * and does aysnc commit are routed correctly. Currently this is just
TransactionalMode.full
+ */
+public abstract class AccordMigrationReadRaceTestBase extends AccordTestBase
+{
+ private static final Logger logger =
LoggerFactory.getLogger(AccordMigrationReadRaceTestBase.class);
+ private static final int TEST_BOUNDS_CONCURRENCY = 32;
+ // Set BATCH_INDEX to the failing batch and this to true to find out the
query index, then set QUERY_INDEX
+ private static final boolean EXECUTE_BATCH_QUERIES_SERIALLY = false;
+ // Specify only a single batch or query to run
+ private static final Integer BATCH_INDEX = null;
+ private static final Integer QUERY_INDEX = null;
+ private static final String TABLE_FMT = "CREATE TABLE %s (pk blob, c int,
v int, PRIMARY KEY ((pk), c));";
+
+ private static IPartitioner partitioner;
+
+ private static Range<Token> migratingRange;
+
+ private static ICoordinator coordinator;
+
+ private final static TestMessageSink messageSink = new TestMessageSink();
+ private static class TestMessageSink implements IMessageSink
+ {
+ private final Queue<Pair<InetSocketAddress,IMessage>> messages = new
ConcurrentLinkedQueue<>();
+ private final Set<InetSocketAddress> blackholed = new
ConcurrentHashSet<>();
+
+ public void reset()
+ {
+ messages.clear();
+ blackholed.clear();
+ }
+
+ @Override
+ public void accept(InetSocketAddress to, IMessage message) {
+ messages.offer(Pair.create(to,message));
+ IInstance i = SHARED_CLUSTER.get(to);
+ if (blackholed.contains(to) || blackholed.contains(message.from()))
+ return;
+ if (i != null)
+ i.receiveMessage(message);
+ }
+ }
+
+ private final boolean migrateAwayFromAccord;
+
+ protected AccordMigrationReadRaceTestBase()
+ {
+ this.migrateAwayFromAccord = migratingAwayFromAccord();
+ }
+
+ protected abstract boolean migratingAwayFromAccord();
+
+ @Override
+ protected Logger logger()
+ {
+ return logger;
+ }
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ ServerTestUtils.daemonInitialization();
+ // Otherwise repair complains if you don't specify a keyspace
+ CassandraRelevantProperties.SYSTEM_TRACES_DEFAULT_RF.setInt(3);
+ AccordTestBase.setupCluster(builder -> builder.appendConfig(config ->
config.set("paxos_variant", PaxosVariant.v2.name())
+
.set("read_request_timeout", "3600s")
+
.set("range_request_timeout", "3600s")
+
.set("native_transport_timeout", "3600s")
+
.set("accord.range_migration", "explicit")), 3);
+ partitioner =
FBUtilities.newPartitioner(SHARED_CLUSTER.get(1).callsOnInstance(() ->
DatabaseDescriptor.getPartitioner().getClass().getSimpleName()).call());
+ StorageService.instance.setPartitionerUnsafe(partitioner);
+ ServerTestUtils.prepareServerNoRegister();
+ LongToken migrationStart = new
LongToken(Long.valueOf(SHARED_CLUSTER.get(2).callOnInstance(() ->
DatabaseDescriptor.getInitialTokens().iterator().next())));
+ LongToken migrationEnd = new
LongToken(Long.valueOf(SHARED_CLUSTER.get(3).callOnInstance(() ->
DatabaseDescriptor.getInitialTokens().iterator().next())));
+ migratingRange = new Range<>(migrationStart, migrationEnd);
+ coordinator = SHARED_CLUSTER.coordinator(1);
+ SHARED_CLUSTER.setMessageSink(messageSink);
+ buildData();
+ }
+
+ private static final int NUM_PARTITIONS = 1000;
+ private static final int ROWS_PER_PARTITION = 10;
+ private static final Object[][][] data = new Object[NUM_PARTITIONS][][];
+ private static final Object[][] dataFlat = new Object[NUM_PARTITIONS *
ROWS_PER_PARTITION][];
+ private static ByteBuffer pkeyAccord;
+ private static int pkeyAccordDataIndex;
+
+ private static void buildData()
+ {
+ Random r = new Random(0);
+ long[] tokens = new long[NUM_PARTITIONS];
+ for (int i = 0; i < tokens.length; i++)
+ tokens[i] = r.nextLong();
+ Arrays.sort(tokens);
+
+ for (int i = 0; i < NUM_PARTITIONS; i++)
+ {
+ data[i] = new Object[ROWS_PER_PARTITION][];
+ ByteBuffer pk = keyForToken(tokens[i]);
+ for (int j = 0; j < ROWS_PER_PARTITION; j++)
+ {
+ int clustering = r.nextInt();
+ data[i][j] = new Object[] { pk, clustering, 42 };
+ }
+ Arrays.sort(data[i], Comparator.comparing(row -> (Integer)row[1]));
+ }
+ for (int i = 0; i < NUM_PARTITIONS; i++)
+ {
+ for (int j = 0; j < ROWS_PER_PARTITION; j++)
+ {
+ int idx = i * ROWS_PER_PARTITION + j;
+ dataFlat[idx] = new Object[] { data[i][j][0], data[i][j][1],
data[i][j][2] };
+ if
(migratingRange.contains(Murmur3Partitioner.instance.getToken((ByteBuffer)data[i][j][0])))
+ {
+ pkeyAccord = (ByteBuffer)data[i][j][0];
+ pkeyAccordDataIndex = i;
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDownClass()
+ {
+ StorageService.instance.resetPartitionerUnsafe();
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ messageSink.reset();
+ SHARED_CLUSTER.forEach(ClusterUtils::clearAndUnpause);
+ super.tearDown();
+ }
+
+ private void loadData() throws Exception
+ {
+ logger.info("Starting data load");
+ Stopwatch sw = Stopwatch.createStarted();
+ List<java.util.concurrent.Future<SimpleQueryResult>> inserts = new
ArrayList<>();
+ for (int i = 0; i < NUM_PARTITIONS; i++)
+ {
+ for (int j = 0; j < ROWS_PER_PARTITION; j++)
+
inserts.add(coordinator.asyncExecuteWithResult(insertCQL(qualifiedAccordTableName,
(ByteBuffer)data[i][j][0], (int)data[i][j][1], (int)data[i][j][2]), ALL));
+
+ if (i % 100 == 0)
+ {
+ for (java.util.concurrent.Future<SimpleQueryResult> insert :
inserts)
+ insert.get();
+ inserts.clear();
+ }
+ }
+ logger.info("Data load took %dms", sw.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ private NavigableSet<Long> boundsTokens()
+ {
+ long migratingRangeStart = migratingRange.left.getLongValue();
+ long migratingRangeEnd = migratingRange.right.getLongValue();
+ NavigableSet<Long> set = new TreeSet<>();
+ set.add(migratingRangeStart - 1);
+ set.add(migratingRangeStart);
+ set.add(migratingRangeStart + 1);
+ set.add(migratingRangeEnd - 1);
+ set.add(migratingRangeEnd);
+ set.add(migratingRangeEnd + 1);
+ set.add(Long.MAX_VALUE);
+ set.add(Long.MIN_VALUE + 1);
+ set.add(0L);
+ return set;
+ }
+
+ private void loadOverlapData()
+ {
+ for (long token : boundsTokens())
+ coordinator.executeWithResult(insertCQL(qualifiedAccordTableName,
keyForToken(token), 42, 43), ALL);
+ }
+
+ @Test
+ public void testKeyRouting() throws Throwable
+ {
+ String readCQL = "SELECT * FROM " + qualifiedAccordTableName + " WHERE
pk = 0x" + bytesToHex(pkeyAccord);
+ testSplitAndRetry(readCQL, this::loadData, result ->
assertThat(result).isDeepEqualTo(data[pkeyAccordDataIndex]));
+ }
+
+ @Test
+ public void testRangeRouting() throws Throwable
+ {
+ String cql = "SELECT * FROM " + qualifiedAccordTableName + " WHERE
token(pk) > " + Murmur3Partitioner.MINIMUM.token;
+ testSplitAndRetry(cql, this::loadData, result -> {
+ assertThat(result).isDeepEqualTo(dataFlat);
+ });
+ }
+
+ @Test
+ public void testBounds() throws Throwable
+ {
+ NavigableSet<Long> tokens = boundsTokens();
+ Queue<String> queries = new ArrayDeque<>();
+ Queue<Consumer<SimpleQueryResult>> validations = new ArrayDeque<>();
+ Queue<String> retryExpectedQueries = new ArrayDeque<>();
+ Queue<Consumer<SimpleQueryResult>> retryExpectedValidations = new
ArrayDeque<>();
+ for (long firstToken : tokens)
+ {
+ ByteBuffer pk = keyForToken(firstToken);
+ for (TokenOperator op : TokenOperator.values())
+ {
+ String cql = "SELECT * FROM %s WHERE " + op.condition;
+ cql = cql.replace("?", "0x" + bytesToHex(pk));
+ NavigableSet<Long> expectedTokens = op.expected(firstToken,
tokens);
+ boolean expectRetry = op.intersects(firstToken,
migratingRange);
+ Consumer<SimpleQueryResult> validation = result -> {
+ Assertions.assertThat(InteropTokenRangeTest.tokens(result))
+ .describedAs("Token %d with operator %s",
firstToken, op.condition)
+ .isEqualTo(expectedTokens);
+ };
+ if (expectRetry)
+ {
+ retryExpectedQueries.add(cql);
+ retryExpectedValidations.add(validation);
+ }
+ else
+ {
+ queries.add(cql);
+ validations.add(validation);
+ }
+ }
+
+ for (long secondToken : tokens)
+ {
+ for (TokenOperator lt : Arrays.asList(lt, lte))
+ {
+ for (TokenOperator gt : Arrays.asList(gt, gte))
+ {
+ ByteBuffer gtPk = keyForToken(secondToken);
+ String cql = "SELECT * FROM %s WHERE " + lt.condition
+ " AND " + gt.condition;
+ cql = cql.replaceFirst("\\?", "0x" + bytesToHex(pk));
+ cql = cql.replaceFirst("\\?", "0x" + bytesToHex(gtPk));
+ NavigableSet<Long> expectedTokens = new
TreeSet<>(Sets.intersection(lt.expected(firstToken, tokens),
gt.expected(secondToken, tokens)));
+ Consumer<SimpleQueryResult> validation = result -> {
+
Assertions.assertThat(InteropTokenRangeTest.tokens(result))
+ .describedAs("LT Token %d GT Token %d
with operators %s / %s", firstToken, secondToken, lt.condition, gt.condition)
+ .isEqualTo(expectedTokens);
+ };
+ boolean expectRetry = lt.intersects(firstToken,
migratingRange) && gt.intersects(secondToken, migratingRange);
+ // This evaluates to no rows without actually executing
+ if (firstToken == secondToken && (lt ==
TokenOperator.lt || gt == TokenOperator.gt))
+ expectRetry = false;
+ if (firstToken < secondToken)
+ expectRetry = false;
+ if (expectRetry)
+ {
+ retryExpectedQueries.add(cql);
+ retryExpectedValidations.add(validation);
+ }
+ else
+ {
+ queries.add(cql);
+ validations.add(validation);
+ }
+ }
+ }
+
+ ByteBuffer rhsPK = keyForToken(secondToken);
+ String cql = "SELECT * FROM %s WHERE token(pk) BETWEEN
token(?) AND token(?)";
+ cql = cql.replaceFirst("\\?", "0x" + bytesToHex(pk));
+ cql = cql.replaceFirst("\\?", "0x" + bytesToHex(rhsPK));
+ NavigableSet<Long> expectedTokens = new
TreeSet<>(Sets.intersection(gte.expected(firstToken, tokens),
lte.expected(secondToken, tokens)));
+ Consumer<SimpleQueryResult> validation = result -> {
+ Assertions.assertThat(InteropTokenRangeTest.tokens(result))
+ .describedAs("Between token %d and %d with
operator token(pk) BETWEEN token(?) AND token(?)", firstToken, secondToken)
+ .isEqualTo(expectedTokens);
+ };
+ // Cassandra straight up returns the wrong answer here so
until it is fixed skip it
+ // https://issues.apache.org/jira/browse/CASSANDRA-20154
+ if (firstToken > secondToken)
+ continue;
+ boolean expectRetry = gte.intersects(firstToken,
migratingRange) && lte.intersects(secondToken, migratingRange);
+ if (expectRetry)
+ {
+ retryExpectedQueries.add(cql);
+ retryExpectedValidations.add(validation);
+ }
+ else
+ {
+ queries.add(cql);
+ validations.add(validation);
+ }
+ }
+ }
+
+ testBoundsBatches(queries, validations, false);
+ testBoundsBatches(retryExpectedQueries, retryExpectedValidations,
true);
+ }
+
+ private void testBoundsBatches(Queue<String> queries,
Queue<Consumer<SimpleQueryResult>> validations, boolean expectRetry) throws
Throwable
+ {
+ List<String> queryBatch = new ArrayList<>();
+ List<Consumer<SimpleQueryResult>> validationBatch = new ArrayList<>();
+ int batchCount = 0;
+ while (!queries.isEmpty())
+ {
+ queryBatch.add(queries.poll());
+ validationBatch.add(validations.poll());
+ if (queryBatch.size() == TEST_BOUNDS_CONCURRENCY)
+ {
+ if (BATCH_INDEX == null || BATCH_INDEX == batchCount)
+ {
+ logger.info("Executing batch {}", batchCount);
+ testBoundsBatch(queryBatch, validationBatch, expectRetry,
batchCount);
+ }
+ else
+ {
+ logger.info("Skipping batch {}", batchCount);
+ }
+ batchCount++;
+ queryBatch.clear();
+ validationBatch.clear();
+ }
+ }
+
+ if (!queryBatch.isEmpty())
+ {
+ logger.info("Executing batch " + batchCount);
+ testBoundsBatch(queryBatch, validationBatch, expectRetry,
batchCount);
+ }
+ }
+
+ private void testBoundsBatch(List<String> readCQL,
List<Consumer<SimpleQueryResult>> validation, boolean expectRetry, int
batchCount) throws Throwable
+ {
+ if (EXECUTE_BATCH_QUERIES_SERIALLY)
+ {
+ for (int i = 0; i < readCQL.size(); i++)
+ {
+ if (QUERY_INDEX == null || QUERY_INDEX == i)
+ {
+ logger.info("Executing query from batch {} query index
{}", batchCount, i);
+ String cql = format(readCQL.get(i),
qualifiedAccordTableName);
+ testSplitAndRetry(ImmutableList.of(cql),
this::loadOverlapData, ImmutableList.of(validation.get(i)), expectRetry);
+ tearDown();
+ setup();
+ afterEach();
+ }
+ else
+ {
+ logger.info("Skipping query from batch {} query index {}",
batchCount, i);
+ }
+ }
+ }
+ else
+ {
+ readCQL = readCQL.stream().map(cql -> format(cql,
qualifiedAccordTableName)).collect(toImmutableList());
+ testSplitAndRetry(readCQL, this::loadOverlapData, validation,
expectRetry);
+ tearDown();
+ setup();
+ afterEach();
+ }
+ }
+
+ private void testSplitAndRetry(String readCQL, ThrowingRunnable load,
Consumer<SimpleQueryResult> validation) throws Throwable
+ {
+ testSplitAndRetry(ImmutableList.of(readCQL), load,
ImmutableList.of(validation),true);
+ }
+
+ private void testSplitAndRetry(List<String> readCQL, ThrowingRunnable
load, List<Consumer<SimpleQueryResult>> validation, boolean expectRetry) throws
Throwable
+ {
+ test(createTables(TABLE_FMT, qualifiedAccordTableName),
+ cluster -> {
+ load.run();
+ // Node 3 is always the out of sync node
+ IInvokableInstance outOfSyncInstance =
setUpOutOfSyncNode(cluster);
+ ICoordinator coordinator = outOfSyncInstance.coordinator();
+ int startMigrationRejectCount =
getAccordReadMigrationRejects(3);
+ int startRetryCount =
getReadRetryOnDifferentSystemCount(outOfSyncInstance);
+ int startRejectedCount = getReadsRejectedOnWrongSystemCount();
+ logger.info("Executing reads " + readCQL + " expect retry " +
expectRetry);
+ List<Future<SimpleQueryResult>> results = readCQL.stream()
+ .map(read ->
coordinator.asyncExecuteWithResult(read, ALL))
+
.collect(toImmutableList());
+
+ if (migrateAwayFromAccord && expectRetry)
+ {
+ int expectedTransactions = readCQL.size();
+ // Accord will block until we unpause enactment so to
test the routing we wait until the transaction
+ // has started so the epoch it is created in is the old
one
+ Util.spinUntilTrue(() ->
outOfSyncInstance.callOnInstance(() -> {
+ logger.info("Coordinating {}",
AccordService.instance().node().coordinating());
+ return
AccordService.instance().node().coordinating().size() == expectedTransactions;
+ }));
+
+ logger.info("Accord node is now coordinating something,
unpausing so it can continue to execute");
+ }
+
+ if (!migrateAwayFromAccord && expectRetry)
+ spinAssertEquals(readCQL.size() * 2, 10, () ->
getReadsRejectedOnWrongSystemCount() - startRejectedCount);
+
+ // Accord can't finish the transaction without unpausing
+ if (expectRetry || migrateAwayFromAccord)
+ {
+ logger.info("Unpausing out of sync instance before
waiting on result");
+ // Testing read coordination retry loop let coordinator
get up to date and retry
+ unpauseEnactment(outOfSyncInstance);
+ }
+
+ try
+ {
+ for (int i = 0; i < results.size(); i++)
+ {
+ SimpleQueryResult result = results.get(i).get();
+ logger.info("Result for: " + readCQL.get(i));
+ logger.info(result.toString());
+ try
+ {
+ validation.get(i).accept(result);
+ }
+ catch (Throwable t)
+ {
+ logger.info("Query index {} failed", i);
+ throw t;
+ }
+ }
+ }
+ catch (ExecutionException e)
+ {
+// // This is expected when inverting the migration
+// if (migrateAwayFromAccord && e.getCause() instanceof
CoordinatorBehindException)
+// throw e;
+ throw e;
+ }
+
+ if (!expectRetry)
+ {
+ logger.info("Unpausing out of sync instance after waiting
on result");
+ // Testing read coordination retry loop let coordinator
get up to date and retry
+ unpauseEnactment(outOfSyncInstance);
+ }
+
+ int endRetryCount =
getReadRetryOnDifferentSystemCount(outOfSyncInstance);
+ int endRejectedCount = getReadsRejectedOnWrongSystemCount();
+ int endMigrationRejects = getAccordReadMigrationRejects(3);
+ if (expectRetry)
+ {
+ if (migrateAwayFromAccord)
+ {
+ assertEquals(readCQL.size(), endRetryCount -
startRetryCount);
+ assertEquals(readCQL.size(), endMigrationRejects -
startMigrationRejectCount);
+ }
+ else
+ {
+ assertEquals(1 * readCQL.size(), endRetryCount -
startRetryCount);
+ // Expect only two nodes to reject since they enacted
the new epoch
+ assertEquals(2 * readCQL.size(), endRejectedCount -
startRejectedCount);
+ }
+ }
+ else
+ {
+ assertEquals(0, endRetryCount - startRetryCount);
+ assertEquals(0, endRejectedCount - startRejectedCount);
+ }
+ });
+ }
+
+ /*
+ * Set up 3 to be behind and unaware of the migration having progressed to
the point where reads need to
+ * be on a different system while 1 and 2 are aware
+ */
+ private IInvokableInstance setUpOutOfSyncNode(Cluster cluster) throws
Throwable
+ {
+ IInvokableInstance i1 = cluster.get(1);
+ IInvokableInstance i2 = cluster.get(2);
+ IInvokableInstance i3 = cluster.get(3);
+
+ long afterAlter = getNextEpoch(i1).getEpoch();
+ logger.info("Epoch after alter {}", afterAlter);
+ if (migrateAwayFromAccord)
+ alterTableTransactionalMode(TransactionalMode.off,
TransactionalMigrationFromMode.full);
+ else
+ alterTableTransactionalMode(TransactionalMode.full);
+ Util.spinUntilTrue(() -> cluster.stream().allMatch(instance ->
instance.callOnInstance(() ->
ClusterMetadata.current().epoch.equals(Epoch.create(afterAlter)))), 10);
+
+ long afterMigrationStart = getNextEpoch(i1).getEpoch();
+ logger.info("Epoch after migration start {}", afterMigrationStart);
+ long waitFori1Andi2ToEnact = afterMigrationStart;
+ // Migrating away from Accord need i3 to pause before enacting
+ if (migrateAwayFromAccord)
+ pauseBeforeEnacting(i3, Epoch.create(afterMigrationStart));
+ // Reads are allowed until Accord thinks it owns the range and can
start doing async commit and ignoring consistency levels
+ nodetool(coordinator, "consensus_admin", "begin-migration", "-st",
migratingRange.left.toString(), "-et", migratingRange.right.toString(),
KEYSPACE, accordTableName);
+
+ if (!migrateAwayFromAccord)
+ {
+ // Migration to Accord does not have Accord read until the
migration has completed a data repair and then an Accord repair
+ Util.spinUntilTrue(() -> cluster.stream().allMatch(instance ->
instance.callOnInstance(() ->
ClusterMetadata.current().epoch.equals(Epoch.create(afterMigrationStart)))),
10);
+
+ long afterRepair = getNextEpoch(i1).getEpoch();
+ logger.info("Epoch after repair {}", afterRepair);
+ // First repair only does the data and allows Accord to read, but
doesn't require reads to be done through Accord
+ nodetool(i2, "repair", "-skip-paxos", "-skip-accord", "-st",
migratingRange.left.toString(), "-et", migratingRange.right.toString(),
KEYSPACE, accordTableName);
+ Util.spinUntilTrue(() -> cluster.stream().allMatch(instance ->
instance.callOnInstance(() ->
ClusterMetadata.current().epoch.equals(Epoch.create(afterRepair)))), 10);
+
+ long afterRepairCompletionHandler = getNextEpoch(i1).getEpoch();
+ logger.info("Epoch after repair completion handler {}",
afterRepairCompletionHandler);
+ waitFori1Andi2ToEnact = afterRepairCompletionHandler;
+ // Node 3 will coordinate the query and not be aware that the
migration has begun
+ pauseBeforeEnacting(i3,
Epoch.create(afterRepairCompletionHandler));
+
+ // Unfortunately can't run real repair because it can't complete
with i3 not responding because it's stuck waiting
+ // on TCM so fake the completion of the repair by invoking the
completion handler directly
+ String keyspace = KEYSPACE;
+ String table = accordTableName;
+ long migratingTokenStart = migratingRange.left.getLongValue();
+ long migratingTokenEnd = migratingRange.right.getLongValue();
+ Future<?> result = SHARED_CLUSTER.get(1).asyncRunsOnInstance(() ->
+ {
+
Epoch startEpoch = ClusterMetadata.current().epoch;
+
TableId tableId = Schema.instance.getTableMetadata(keyspace, table).id;
+
List<Range<Token>> ranges = ImmutableList.of(new Range<>(new
LongToken(migratingTokenStart), new LongToken(migratingTokenEnd)));
+
RepairJobDesc desc = new RepairJobDesc(null, null, keyspace, table, ranges);
+
TokenRange range = new TokenRange(new TokenKey(tableId, new
LongToken(migratingTokenStart)), new TokenKey(tableId, new
LongToken(migratingTokenEnd)));
Review Comment:
This doesn't compile:
`[javac]
/Users/sam/git/asf/accord/cep-15-accord/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java:626:
error: TokenRange(AccordRoutingKey,AccordRoutingKey) has private access in
TokenRange
`
##########
src/java/org/apache/cassandra/tcm/log/LocalLog.java:
##########
@@ -707,15 +710,21 @@ private Async(LogSpec logSpec)
@Override
public ClusterMetadata awaitAtLeast(Epoch epoch) throws
InterruptedException, TimeoutException
+ {
+ return awaitAtLeast(epoch, -1, null);
+ }
+
+ @Override
+ public ClusterMetadata awaitAtLeast(Epoch epoch, long timeout,
TimeUnit unit) throws InterruptedException, TimeoutException
{
ClusterMetadata lastSeen = committed.get();
return lastSeen.epoch.compareTo(epoch) >= 0
? lastSeen
- : new AwaitCommit(epoch).get();
+ : new AwaitCommit(epoch).get(timeout, unit);
}
@Override
- public void runOnce(DurationSpec duration) throws TimeoutException
+ public void runOnce(long timeout, TimeUnit unit) throws
TimeoutException
Review Comment:
I don't know precisely why `DurationSpec` was used here, but I also don't
really see a compelling reason for changing it. There are other examples
through the codebase where a `DurationSpec` is pulled from config and passed
directly to domain methods (`ActiveRepairService`, `StreamManager`, CFS
snapshotting etc) so it isn't without precedent.
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -366,9 +374,36 @@ public static RowIterator cas(String keyspaceName,
}
ConsensusAttemptResult lastAttemptResult;
+ Epoch lastEpoch = null;
do
{
+ if (lastEpoch != null)
+ {
+
+ long timeout = requestTime.computeTimeout(nanoTime(),
DatabaseDescriptor.getTransactionTimeout(NANOSECONDS));
+ Epoch lastEpochFinal = lastEpoch;
+
ClusterMetadataService.instance().log().highestPending().ifPresent(epoch ->
+ {
+ try
+ {
+
ClusterMetadataService.instance().awaitAtLeast(epoch, timeout, NANOSECONDS);
Review Comment:
This isn't really "fetching" epochs as the tracing message states, it's
waiting for any gaps in the buffer of pending log entries to be filled and then
the run of consecutive entries processed. Filling those gaps does entail
fetching any missing entries (or having them delivered naturally, in the case
of out of order receipt), but this won't actually trigger that catch up. Is
this what you are trying to achieve here? It might be better to find a way to
have `ConsensusAttemptResult` include an epoch from the replicas to indicate
what the min epoch the coordinator needs to catch up to and calling
`ClusterMetadataService::fetchLogFromPeerOrCMS` with it.
##########
src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java:
##########
@@ -93,24 +85,21 @@ static class Serializer implements
AsymmetricMetadataSerializer<Transformation,
public void serialize(Transformation t, DataOutputPlus out, Version
version) throws IOException
{
BeginConsensusMigrationForTableAndRange v =
(BeginConsensusMigrationForTableAndRange)t;
- out.writeUTF(v.targetProtocol.toString());
ConsensusTableMigration.rangesSerializer.serialize(v.ranges, out,
version);
serializeCollection(v.tables, out, version,
TableId.metadataSerializer);
}
public BeginConsensusMigrationForTableAndRange
deserialize(DataInputPlus in, Version version) throws IOException
{
- ConsensusMigrationTarget targetProtocol =
ConsensusMigrationTarget.fromString(in.readUTF());
Review Comment:
You most likely need to put these behind a `version` guard. On pre-existing
clusters, these transformations will be encoded in the log along with the
version they were written at, so upgraded will need to deserialize them
accordingly. Given this isn't released officially, it may not be an issue of
course.
##########
src/java/org/apache/cassandra/tcm/ClusterMetadataService.java:
##########
@@ -756,7 +756,12 @@ public ClusterMetadata
fetchLogFromPeerOrCMS(ClusterMetadata metadata, InetAddre
public ClusterMetadata awaitAtLeast(Epoch epoch) throws
InterruptedException, TimeoutException
{
- return log.awaitAtLeast(epoch);
+ return awaitAtLeast(epoch, -1, null);
+ }
+
+ public ClusterMetadata awaitAtLeast(Epoch epoch, long timeout, TimeUnit
unit) throws InterruptedException, TimeoutException
Review Comment:
I believe, but am not certain as I didn't write it, that this is the case.
That said, I don't really see the problem with that and the same thing is done
in several other places in the codebase (see my comment on `LocalLog` for
details).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]