dcapwell commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915421597
##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java:
##########
@@ -350,21 +351,19 @@ public static boolean
tokenShouldBeWrittenThroughAccord(@Nonnull ClusterMetadata
// with different results if Accord reads non-transactionally
written data that could be seen differently by different coordinators
// If the current mode writes through Accord then we should always
write though Accord for ranges managed by Accord.
- // Accord needs to do synchronous commit and respect the
consistency level so that Accord will later be able to
- // read its own writes
+ // Accord needs to do synchronous commit and respect the
consistency level so non-SERIAL reads can read Accord's
+ // writes.
if (transactionalModeWritesThroughAccord)
+ {
return tms.migratingAndMigratedRanges.intersects(token);
+ }
- // If we are migrating from a mode that used to write to Accord
then any range that isn't migrating/migrated
- // should continue to write through Accord.
- // It's not completely symmetrical because Paxos is able to read
Accord's writes by performing a single key barrier
- // and regular mutations will be able to do the same thing (needs
to be added along with non-transactional reads)
- // This means that migrating ranges don't need to be written
through Accord because we are running Paxos now
- // and not Accord. When migrating to Accord we need to do all the
writes through Accord even if we aren't
- // reading through Accord so that repair + Accord metadata is
sufficient for Accord to be able to read
- // safely and deterministically from any coordinator
+ // If we are migrating from a mode that used to write to Accord
then any range that isn't migrated
+ // should continue to write through Accord. Accord might still be
executing txns pre-migration so continue
+ // to route writes through Accord until migration is completed.
if (migrationFromWritesThroughAccord)
- return !tms.migratingAndMigratedRanges.intersects(token);
+// return !tms.migratingAndMigratedRanges.intersects(token);
Review Comment:
remove dead code
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordWriteInteroperabilityTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nonnull;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.apache.commons.collections.ListUtils.synchronizedList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class AccordWriteInteroperabilityTest extends AccordTestBase
+{
+ private static final Logger logger =
LoggerFactory.getLogger(AccordInteroperabilityTest.class);
+
+ @Nonnull
+ private final TransactionalMode mode;
+
+ private final boolean migrated;
+
+ public AccordWriteInteroperabilityTest(@Nonnull TransactionalMode mode,
boolean migrated)
+ {
+ this.mode = mode;
+ this.migrated = migrated;
+ }
+
+ @Parameterized.Parameters(name = "transactionalMode={0}, migrated={1}")
+ public static Collection<Object[]> data() {
+ List<Object[]> tests = new
ArrayList<>(TransactionalMode.values().length * 2);
+ for (TransactionalMode mode : TransactionalMode.values())
+ {
+ if (mode.accordIsEnabled)
+ {
+ tests.add(new Object[]{ mode, true });
+ tests.add(new Object[]{ mode, false });
+ }
+ }
+ return tests;
+ }
+
+ @Override
+ protected Logger logger()
+ {
+ return logger;
+ }
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ AccordTestBase.setupCluster(builder -> builder.withConfig(config ->
config.set("accord.range_migration", "auto")
+
.set("paxos_variant", "v2")),
+ 3);
+ }
+
+ @After
+ public void tearDown()
+ {
+ SHARED_CLUSTER.setMessageSink(null);
+ }
+
+
+ private String testTransactionInsert()
+ {
+ return "BEGIN TRANSACTION\n" +
+ " INSERT INTO " + qualifiedAccordTableName + " (k, c, v)
VALUES (42, 2, 3);\n" +
+ "COMMIT TRANSACTION";
+ }
+
+ private String testInsert()
+ {
+ return "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES
(42, 2, 3)";
+ }
+
+ private String testBatchInsert()
+ {
+ return "BEGIN BATCH\n" +
+ "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES
(1, 2, 3);\n" +
+ "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES
(42, 43, 44);\n" +
+ "APPLY BATCH";
+ }
+
+ @Test
+ public void testTransactionStatementApplyIsInteropApply() throws Throwable
+ {
+ testApplyIsInteropApply(testTransactionInsert());
+ }
+
+ @Test
+ public void testNonSerialApplyIsInteropApply() throws Throwable
+ {
+ testApplyIsInteropApply(testInsert());
+ }
+
+ @Test
+ public void testBatchInsertApplyIsInteropApply() throws Throwable
+ {
+ testApplyIsInteropApply(testBatchInsert());
+ }
+
+ private void testApplyIsInteropApply(String query) throws Throwable
+ {
+ test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v
int, PRIMARY KEY(k, c))" + (migrated ? " WITH " +
transactionalMode.asCqlParam() : ""),
+ cluster -> {
+ MessageCountingSink messageCountingSink = new
MessageCountingSink(SHARED_CLUSTER);
+ List<String> failures = synchronizedList(new ArrayList<>());
+ // Verify that the apply response is only sent after the row
has been inserted
+ // TODO (required): Need to delay mutation stage/mutation to
ensure this has time to catch it
+ SHARED_CLUSTER.setMessageSink((to, message) -> {
+ try
+ {
+ if (message.verb() == Verb.ACCORD_APPLY_RSP.id)
+ {
+ // It can be async if it's migrated
+ if (migrated)
+ return;
+ String currentThread =
Thread.currentThread().getName();
+ char nodeIndexChar = currentThread.charAt(4);
+ int nodeIndex =
Integer.parseInt(String.valueOf(nodeIndexChar));
Review Comment:
this can fail in some cases leading to this test being flakey. You can also
pull this out from the class loader. You would be
`org.apache.cassandra.distributed.shared.InstanceClassLoader` which has
`org.apache.cassandra.distributed.shared.InstanceClassLoader#getInstanceId`
You can also query
`org.apache.cassandra.concurrent.NamedThreadFactory#globalPrefix` which is what
"should" be the prefix...
both the cases above should be more stable than the thread name
##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -187,6 +199,50 @@ static int computeConcurrencyFactor(int totalRangeCount,
int rangesQueried, int
return concurrencyFactor;
}
+ private PartitionIterator executeAccord(ClusterMetadata cm,
PartitionRangeReadCommand rangeCommand, ConsistencyLevel cl)
+ {
+ //TODO (nicetohave): This is very inefficient because it will not map
to the command store owned ranges
+ // so every command store will return results up to the limit and most
could be discarded.
+ // Really we want to split the ranges by command stores owned ranges
and then query one at a time
+ // For this to work well it really needs to integrated upwards where
the ranges to query are being picked
+ AsyncTxnResult result = StorageProxy.readWithAccord(cm, rangeCommand,
ImmutableList.of(rangeCommand.dataRange().keyRange()), cl, requestTime);
Review Comment:
We spoke about this in slack, putting here for history.
command stores will split the request into by each store and only run on the
stores that intersect... this leads to the case where each store could return
at-most `limit` rows, so you need to post process to limit again... Ariel
argues you can run against each store sequentially and return early (though
would likely cause timeouts), but i argue that isn't a safe assumption...
queries like the following could be impacted
```
select token(pk), pk, ck, value from ks.tbl where pk in (0, 1) order by ck
desc limit 1;
```
##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java:
##########
@@ -340,4 +512,410 @@ private static ConsensusRoutingDecision pickPaxos()
{
return Paxos.useV2() ? paxosV2 : paxosV1;
}
+
+ public static void validateSafeToReadNonTransactionally(ReadCommand
command)
+ {
+ if (command.allowsPotentialTxnConflicts())
+ return;
+
+ String keyspace = command.metadata().keyspace;
+ // System keyspaces are never managed by Accord
+ if (SchemaConstants.isSystemKeyspace(keyspace))
+ return;
+
+ // Local keyspaces are never managed by Accord
+ if (Schema.instance.localKeyspaces().containsKeyspace(keyspace))
+ return;
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ TableId tableId = command.metadata().id;
+ TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+ // Null for local tables
+ if (tableMetadata == null)
Review Comment:
local or dropped tables
##########
src/java/org/apache/cassandra/service/accord/txn/TxnRead.java:
##########
@@ -19,75 +19,271 @@
package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import com.google.common.collect.ImmutableList;
+
+import accord.api.Data;
+import accord.api.DataStore;
import accord.api.Read;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.ObjectSizes;
-import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
+import static
org.apache.cassandra.service.accord.IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS;
+import static
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.CAS_READ;
+import static
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.USER;
+import static org.apache.cassandra.service.accord.txn.TxnData.txnDataName;
+import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
+import static
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
-public interface TxnRead extends Read
+public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
{
- ConsistencyLevel cassandraConsistencyLevel();
+ public static final TxnRead EMPTY = new TxnRead(new TxnNamedRead[0], null);
+ private static final long EMPTY_SIZE = ObjectSizes.measure(EMPTY);
+ private static final Comparator<TxnNamedRead>
TXN_NAMED_READ_KEY_COMPARATOR = Comparator.comparing(a -> ((PartitionKey)
a.keys().get(0)));
+ private static final Comparator<TxnNamedRead>
TXN_NAMED_READ_RANGE_COMPARATOR = Comparator.comparing(a -> ((TokenRange)
a.keys()).start());
Review Comment:
this isn't safe right? if you have 2 ranges with the same start then
`Collection.sort` can blow up with `TimSort` errors as the order can be
non-deterministic... don't we also need to check `end` in case start conflicts?
##########
test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java:
##########
@@ -147,7 +147,7 @@ private static boolean allowsMigration(TransactionalMode
mode)
{
switch (mode)
{
- case unsafe_writes:
+ case test_unsafe_writes:
Review Comment:
actually shouldn't we just remove? The test wants to really only test prod
modes
##########
src/java/org/apache/cassandra/service/accord/txn/TxnRead.java:
##########
@@ -19,75 +19,271 @@
package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import com.google.common.collect.ImmutableList;
+
+import accord.api.Data;
+import accord.api.DataStore;
import accord.api.Read;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.ObjectSizes;
-import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
+import static
org.apache.cassandra.service.accord.IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS;
+import static
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.CAS_READ;
+import static
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.USER;
+import static org.apache.cassandra.service.accord.txn.TxnData.txnDataName;
+import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
+import static
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
-public interface TxnRead extends Read
+public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
{
- ConsistencyLevel cassandraConsistencyLevel();
+ public static final TxnRead EMPTY = new TxnRead(new TxnNamedRead[0], null);
+ private static final long EMPTY_SIZE = ObjectSizes.measure(EMPTY);
+ private static final Comparator<TxnNamedRead>
TXN_NAMED_READ_KEY_COMPARATOR = Comparator.comparing(a -> ((PartitionKey)
a.keys().get(0)));
+ private static final Comparator<TxnNamedRead>
TXN_NAMED_READ_RANGE_COMPARATOR = Comparator.comparing(a -> ((TokenRange)
a.keys()).start());
- public interface TxnReadSerializer<T extends TxnRead> extends
IVersionedSerializer<T> {}
+ // Cassandra's consistency level used by Accord to safely read data
written outside of Accord
+ @Nullable
+ private final ConsistencyLevel cassandraConsistencyLevel;
- enum Kind
+ private TxnRead(@Nonnull TxnNamedRead[] items, @Nullable ConsistencyLevel
cassandraConsistencyLevel)
{
- key(0),
- range(1);
+ super(items);
+ checkNotNull(items, "items is null");
+ checkArgument(cassandraConsistencyLevel == null ||
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel),
"Unsupported consistency level for read");
Review Comment:
```suggestion
checkArgument(cassandraConsistencyLevel == null ||
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel),
"Unsupported consistency level for read: %s", cassandraConsistencyLevel);
```
##########
src/java/org/apache/cassandra/service/accord/txn/TxnRead.java:
##########
@@ -19,75 +19,271 @@
package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import com.google.common.collect.ImmutableList;
+
+import accord.api.Data;
+import accord.api.DataStore;
import accord.api.Read;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.ObjectSizes;
-import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
+import static
org.apache.cassandra.service.accord.IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS;
+import static
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.CAS_READ;
+import static
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.USER;
+import static org.apache.cassandra.service.accord.txn.TxnData.txnDataName;
+import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
+import static
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
-public interface TxnRead extends Read
+public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
{
- ConsistencyLevel cassandraConsistencyLevel();
+ public static final TxnRead EMPTY = new TxnRead(new TxnNamedRead[0], null);
+ private static final long EMPTY_SIZE = ObjectSizes.measure(EMPTY);
+ private static final Comparator<TxnNamedRead>
TXN_NAMED_READ_KEY_COMPARATOR = Comparator.comparing(a -> ((PartitionKey)
a.keys().get(0)));
+ private static final Comparator<TxnNamedRead>
TXN_NAMED_READ_RANGE_COMPARATOR = Comparator.comparing(a -> ((TokenRange)
a.keys()).start());
- public interface TxnReadSerializer<T extends TxnRead> extends
IVersionedSerializer<T> {}
+ // Cassandra's consistency level used by Accord to safely read data
written outside of Accord
+ @Nullable
+ private final ConsistencyLevel cassandraConsistencyLevel;
- enum Kind
+ private TxnRead(@Nonnull TxnNamedRead[] items, @Nullable ConsistencyLevel
cassandraConsistencyLevel)
{
- key(0),
- range(1);
+ super(items);
+ checkNotNull(items, "items is null");
+ checkArgument(cassandraConsistencyLevel == null ||
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel),
"Unsupported consistency level for read");
+ this.cassandraConsistencyLevel = cassandraConsistencyLevel;
+ }
- int id;
+ private TxnRead(@Nonnull List<TxnNamedRead> items, @Nullable
ConsistencyLevel cassandraConsistencyLevel)
+ {
+ super(items);
+ checkNotNull(items, "items is null");
+ checkArgument(cassandraConsistencyLevel == null ||
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel),
"Unsupported consistency level for read");
Review Comment:
```suggestion
checkArgument(cassandraConsistencyLevel == null ||
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel),
"Unsupported consistency level for read: %s", cassandraConsistencyLevel);
```
##########
src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java:
##########
@@ -156,14 +156,15 @@ private UnfilteredPartitionIterator
executeReadCommand(ReadCommand cmd, Replica
ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler =
new ReadCallback<>(resolver, cmd, replicaPlan, requestTime);
- if (source.isSelf())
+ if (source.isSelf() && coordinator.localReadSupported())
Review Comment:
for me: only `false` here
`org.apache.cassandra.service.accord.interop.AccordInteropExecution#localReadSupported`
##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -283,6 +382,77 @@ PartitionIterator sendNextRequests()
return
counter.applyTo(StorageProxy.concatAndBlockOnRepair(concurrentQueries,
readRepairs));
}
+ // Wrap the iterator to retry if request routing is incorrect
+ private PartitionIterator
retryingPartitionIterator(Function<ClusterMetadata, PartitionIterator> attempt,
ConsistencyLevel cl)
+ {
+ return new PartitionIterator()
+ {
+ private ClusterMetadata lastClusterMetadata =
ClusterMetadata.current();
+ private PartitionIterator delegate =
attempt.apply(lastClusterMetadata);
+
+ @Override
+ public void close()
+ {
+ delegate.close();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ while (true)
+ {
+ try
+ {
+ return delegate.hasNext();
+ }
+ catch (RetryOnDifferentSystemException e)
+ {
+ readMetrics.retryDifferentSystem.mark();
+ readMetricsForLevel(cl).retryDifferentSystem.mark();
+ logger.debug("Retrying range read on different system
because some reads were misrouted according to Accord");
+ Tracing.trace("Got {} from range reads, will retry",
e);
+ }
+ catch (CoordinatorBehindException e)
+ {
+ readMetrics.retryCoordinatorBehind.mark();
+ readMetricsForLevel(cl).retryCoordinatorBehind.mark();
+ logger.debug("Retrying range read now that coordinator
has caught up to cluster metadata");
+ Tracing.trace("Got {} from range reads, will retry",
e);
+ }
+ // Fetch the next epoch to retry
+ long timeout = requestTime.computeTimeout(nanoTime(),
DatabaseDescriptor.getRangeRpcTimeout(NANOSECONDS));
+ Optional<Epoch> pending =
ClusterMetadataService.instance().log().highestPending();
+ logger.debug("Pending highest epoch is {}", pending);
+ pending.ifPresent(epoch ->
+ {
+ try
+ {
+
ClusterMetadataService.instance().awaitAtLeast(epoch, timeout, NANOSECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
Review Comment:
```suggestion
Thread.currentThread().interrupt();
throw new
UncheckedInterruptedException(e);
```
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordWriteInteroperabilityTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nonnull;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.apache.commons.collections.ListUtils.synchronizedList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class AccordWriteInteroperabilityTest extends AccordTestBase
+{
+ private static final Logger logger =
LoggerFactory.getLogger(AccordInteroperabilityTest.class);
+
+ @Nonnull
+ private final TransactionalMode mode;
+
+ private final boolean migrated;
+
+ public AccordWriteInteroperabilityTest(@Nonnull TransactionalMode mode,
boolean migrated)
+ {
+ this.mode = mode;
+ this.migrated = migrated;
+ }
+
+ @Parameterized.Parameters(name = "transactionalMode={0}, migrated={1}")
+ public static Collection<Object[]> data() {
+ List<Object[]> tests = new
ArrayList<>(TransactionalMode.values().length * 2);
+ for (TransactionalMode mode : TransactionalMode.values())
+ {
+ if (mode.accordIsEnabled)
+ {
+ tests.add(new Object[]{ mode, true });
+ tests.add(new Object[]{ mode, false });
+ }
+ }
+ return tests;
+ }
+
+ @Override
+ protected Logger logger()
+ {
+ return logger;
+ }
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ AccordTestBase.setupCluster(builder -> builder.withConfig(config ->
config.set("accord.range_migration", "auto")
+
.set("paxos_variant", "v2")),
+ 3);
+ }
+
+ @After
+ public void tearDown()
+ {
+ SHARED_CLUSTER.setMessageSink(null);
+ }
+
+
+ private String testTransactionInsert()
+ {
+ return "BEGIN TRANSACTION\n" +
+ " INSERT INTO " + qualifiedAccordTableName + " (k, c, v)
VALUES (42, 2, 3);\n" +
+ "COMMIT TRANSACTION";
+ }
+
+ private String testInsert()
+ {
+ return "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES
(42, 2, 3)";
+ }
+
+ private String testBatchInsert()
+ {
+ return "BEGIN BATCH\n" +
+ "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES
(1, 2, 3);\n" +
+ "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES
(42, 43, 44);\n" +
+ "APPLY BATCH";
+ }
+
+ @Test
+ public void testTransactionStatementApplyIsInteropApply() throws Throwable
+ {
+ testApplyIsInteropApply(testTransactionInsert());
+ }
+
+ @Test
+ public void testNonSerialApplyIsInteropApply() throws Throwable
+ {
+ testApplyIsInteropApply(testInsert());
+ }
+
+ @Test
+ public void testBatchInsertApplyIsInteropApply() throws Throwable
+ {
+ testApplyIsInteropApply(testBatchInsert());
+ }
+
+ private void testApplyIsInteropApply(String query) throws Throwable
+ {
+ test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v
int, PRIMARY KEY(k, c))" + (migrated ? " WITH " +
transactionalMode.asCqlParam() : ""),
+ cluster -> {
+ MessageCountingSink messageCountingSink = new
MessageCountingSink(SHARED_CLUSTER);
+ List<String> failures = synchronizedList(new ArrayList<>());
+ // Verify that the apply response is only sent after the row
has been inserted
+ // TODO (required): Need to delay mutation stage/mutation to
ensure this has time to catch it
+ SHARED_CLUSTER.setMessageSink((to, message) -> {
+ try
+ {
+ if (message.verb() == Verb.ACCORD_APPLY_RSP.id)
+ {
+ // It can be async if it's migrated
+ if (migrated)
+ return;
+ String currentThread =
Thread.currentThread().getName();
+ char nodeIndexChar = currentThread.charAt(4);
+ int nodeIndex =
Integer.parseInt(String.valueOf(nodeIndexChar));
+ try
+ {
+ String keyspace = KEYSPACE;
+ String tableName = accordTableName;
+ String fail =
SHARED_CLUSTER.get(nodeIndex).callOnInstance(() -> {
+ ColumnFamilyStore cfs =
ColumnFamilyStore.getIfExists(keyspace, tableName);
+ Memtable memtable =
cfs.getCurrentMemtable();
+ int expectedPartitions =
query.startsWith("BEGIN BATCH") ? 2 : 1;
+ assertEquals(expectedPartitions,
memtable.partitionCount());
+ UnfilteredPartitionIterator partitions =
memtable.partitionIterator(ColumnFilter.all(cfs.metadata()),
DataRange.allData(cfs.getPartitioner()), SSTableReadsListener.NOOP_LISTENER);
+ assertTrue(partitions.hasNext());
+ for (int i = 0; i < expectedPartitions;
i++)
+ {
+ UnfilteredRowIterator rows =
partitions.next();
+
assertTrue(rows.partitionKey().equals(dk(42)) ||
rows.partitionKey().equals(dk(1)));
+ assertTrue(rows.hasNext());
+ Row row = (Row)rows.next();
+ assertFalse(rows.hasNext());
+ }
+ assertFalse(partitions.hasNext());
+ return null;
+ });
+ if (fail != null)
Review Comment:
this is always `null`... when you fail the call throws
##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -196,72 +252,115 @@ static int computeConcurrencyFactor(int totalRangeCount,
int rangesQueried, int
* {@code DataLimits}) may have "state" information and that state may
only be valid for the first query (in
* that it's the query that "continues" whatever we're previously queried).
*/
- private IRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan,
ReadCoordinator readCoordinator, boolean isFirst)
+ private PartitionIterator query(ClusterMetadata cm,
ReplicaPlan.ForRangeRead replicaPlan, ReadCoordinator readCoordinator,
List<ReadRepair<?, ?>> readRepairs, boolean isFirst)
{
PartitionRangeReadCommand rangeCommand =
command.forSubRange(replicaPlan.range(), isFirst);
-
- // If enabled, request repaired data tracking info from full replicas,
but
- // only if there are multiple full replicas to compare results from.
- boolean trackRepairedStatus =
DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
- &&
replicaPlan.contacts().filter(Replica::isFull).size() > 1;
- ClusterMetadata cm = ClusterMetadata.current();
- TableMetadata metadata = command.metadata();
- TableParams tableParams = metadata.params;
- TransactionalMode transactionalMode = tableParams.transactionalMode;
- TransactionalMigrationFromMode transactionalMigrationFromMode =
tableParams.transactionalMigrationFrom;
- if (transactionalMigrationFromMode !=
TransactionalMigrationFromMode.none &&
transactionalMode.nonSerialReadsThroughAccord &&
transactionalMigrationFromMode.nonSerialWritesThroughAccord() &&
transactionalMigrationFromMode.nonSerialReadsThroughAccord())
- throw new UnsupportedOperationException("Live migration is not
supported, can't safely read when migrating from " +
transactionalMigrationFromMode + " to " + transactionalMode);
- if (transactionalMode.nonSerialReadsThroughAccord &&
readCoordinator.isEventuallyConsistent())
+ // Accord interop execution should always be coordinated through the
C* plumbing
+ if (!readCoordinator.isEventuallyConsistent())
{
- //TODO (nicetohave): This is very inefficient because it will not
map the the command store owned ranges
- // so every command store will return results and most will be
discarded due to the limit
- // Really we want to split the ranges by command stores owned
ranges and then query one at a time
- AsyncTxnResult result = StorageProxy.readWithAccord(cm,
rangeCommand, ImmutableList.of(rangeCommand.dataRange().keyRange()),
replicaPlan.consistencyLevel(), requestTime);
- return new AccordRangeResponse(result, rangeCommand.isReversed(),
replicaPlan.consistencyLevel(), requestTime);
+ SingleRangeResponse response = executeNormal(replicaPlan,
rangeCommand, readCoordinator);
+ readRepairs.add(response.getReadRepair());
+ return response;
}
- else
+
+ List<RangeReadWithTarget> reads =
ConsensusRequestRouter.splitReadIntoAccordAndNormal(cm, rangeCommand,
readCoordinator, requestTime);
+ // Special case returning directly to avoid wrapping the iterator and
applying the limits an extra time
+ if (reads.size() == 1)
{
- ReplicaPlan.SharedForRangeRead sharedReplicaPlan =
ReplicaPlan.shared(replicaPlan);
- ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
=
- ReadRepair.create(ReadCoordinator.DEFAULT, command,
sharedReplicaPlan, requestTime);
- DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
=
- new DataResolver<>(ReadCoordinator.DEFAULT, rangeCommand,
sharedReplicaPlan, readRepair, requestTime, trackRepairedStatus);
- ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
- new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan,
requestTime);
-
- if (replicaPlan.contacts().size() == 1 &&
replicaPlan.contacts().get(0).isSelf())
+ RangeReadWithTarget rangeReadWithTarget = reads.get(0);
+
checkState(rangeReadWithTarget.read.dataRange().keyRange().equals(rangeCommand.dataRange().keyRange()));
+ if (rangeReadWithTarget.target == RangeReadTarget.accord &&
readCoordinator.isEventuallyConsistent())
{
- Stage.READ.execute(new
StorageProxy.LocalReadRunnable(rangeCommand, handler, requestTime,
trackRepairedStatus));
+ return executeAccord(cm,
+ rangeReadWithTarget.read,
+ replicaPlan.consistencyLevel());
}
else
{
- for (Replica replica : replicaPlan.contacts())
- {
- Tracing.trace("Enqueuing request to {}", replica);
- ReadCommand command = replica.isFull() ? rangeCommand :
rangeCommand.copyAsTransientQuery(replica);
- Message<ReadCommand> message =
command.createMessage(trackRepairedStatus && replica.isFull(), requestTime);
- MessagingService.instance().sendWithCallback(message,
replica.endpoint(), handler);
- }
+ SingleRangeResponse response = executeNormal(replicaPlan,
rangeReadWithTarget.read, readCoordinator);
+ readRepairs.add(response.getReadRepair());
+ return response;
}
- return new CassandraRangeResponse(resolver, handler, readRepair);
}
+
+ // TODO (review): Should this be reworked to execute the queries
serially from the iterator? It would respect
Review Comment:
i would expect that to timeout more often no?
##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java:
##########
@@ -340,4 +512,410 @@ private static ConsensusRoutingDecision pickPaxos()
{
return Paxos.useV2() ? paxosV2 : paxosV1;
}
+
+ public static void validateSafeToReadNonTransactionally(ReadCommand
command)
+ {
+ if (command.allowsPotentialTxnConflicts())
+ return;
+
+ String keyspace = command.metadata().keyspace;
+ // System keyspaces are never managed by Accord
+ if (SchemaConstants.isSystemKeyspace(keyspace))
+ return;
+
+ // Local keyspaces are never managed by Accord
+ if (Schema.instance.localKeyspaces().containsKeyspace(keyspace))
+ return;
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ TableId tableId = command.metadata().id;
+ TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+ // Null for local tables
+ if (tableMetadata == null)
+ return;
+
+ TransactionalMode transactionalMode =
tableMetadata.params.transactionalMode;
+ TransactionalMigrationFromMode transactionalMigrationFromMode =
tableMetadata.params.transactionalMigrationFrom;
+ if (!transactionalMode.nonSerialReadsThroughAccord &&
!transactionalMigrationFromMode.nonSerialReadsThroughAccord())
+ return;
+
+ TableMigrationState tms =
cm.consensusMigrationState.tableStates.get(tableId);
+
+ // Null with a transaction mode that reads through Accord indicates a
completed migration or table created
+ // to use Accord initially
+ if (tms == null)
+ {
+ checkState(transactionalMigrationFromMode ==
TransactionalMigrationFromMode.none);
+ if (transactionalMode.nonSerialReadsThroughAccord)
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ if (cfs != null)
+ cfs.metric.readsRejectedOnWrongSystem.mark();
+ throw new RetryOnDifferentSystemException();
+ }
+ }
+
+ boolean isExclusivelyReadableFromAccord;
+ if (command.isRangeRequest())
+ isExclusivelyReadableFromAccord =
isBoundsExclusivelyManagedByAccordForRead(transactionalMode,
transactionalMigrationFromMode, tms, command.dataRange().keyRange());
+ else
+ isExclusivelyReadableFromAccord =
isTokenExclusivelyManagedByAccordForRead(transactionalMode,
transactionalMigrationFromMode, tms,
((SinglePartitionReadCommand)command).partitionKey().getToken());
+
+ if (isExclusivelyReadableFromAccord)
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ if (cfs != null)
+ cfs.metric.readsRejectedOnWrongSystem.mark();
+ throw new RetryOnDifferentSystemException();
+ }
+ }
+
+ private static boolean isTokenExclusivelyManagedByAccordForRead(@Nonnull
TransactionalMode transactionalMode,
+ @Nonnull
TransactionalMigrationFromMode migrationFrom,
+ @Nonnull
TableMigrationState tms,
+ @Nonnull
Token token)
+ {
+ checkNotNull(transactionalMode, "transactionalMode is null");
+ checkNotNull(migrationFrom, "migrationFrom is null");
+ checkNotNull(tms, "tms (TableMigrationState) is null");
+ checkNotNull(token, "bounds is null");
+
+ if (transactionalMode.accordIsEnabled)
+ {
+ if (!migrationFrom.isMigrating())
+ return true;
+ if (migrationFrom.migratingFromAccord())
+ return true;
+
+ // Accord is exclusive once the range is fully migrated to Accord,
but possible to read from safely
+ // when accordSafeToReadRanges covers the entire bound
+ if (tms.migratedRanges.intersects(token))
+ return true;
+ }
+ else
+ {
+ // Once the migration starts only barriers are allowed to run for
the key in Accord
+ if (migrationFrom.migratingFromAccord() &&
!tms.migratingAndMigratedRanges.intersects(token))
+ return true;
+ }
+
+ return false;
+ }
+
+ // Returns true if any part of the bound
+ private static boolean isBoundsExclusivelyManagedByAccordForRead(@Nonnull
TransactionalMode transactionalMode,
+ @Nonnull
TransactionalMigrationFromMode migrationFrom,
+ @Nonnull
TableMigrationState tms,
+ @Nonnull
AbstractBounds<PartitionPosition> bounds)
+ {
+ checkNotNull(transactionalMode, "transactionalMode is null");
+ checkNotNull(migrationFrom, "migrationFrom is null");
+ checkNotNull(tms, "tms (TableMigrationState) is null");
+ checkNotNull(bounds, "bounds is null");
+
+ BiPredicate<AbstractBounds<PartitionPosition>,
NormalizedRanges<Token>> intersects = (testBounds, testRanges) -> {
+ // TODO (nicetohave): Efficiency of this intersection
+ for (org.apache.cassandra.dht.Range<Token> range : testRanges)
+ {
+ Pair<AbstractBounds<PartitionPosition>,
AbstractBounds<PartitionPosition>> intersectionAndRemainder =
Range.intersectionAndRemainder(testBounds, range);
+ return intersectionAndRemainder.left != null;
+ }
+ return false;
+ };
+
+ if (bounds.left.getToken().equals(bounds.right.getToken()) &&
!bounds.inclusiveLeft() && bounds.inclusiveRight())
+ {
+ return isTokenExclusivelyManagedByAccordForRead(transactionalMode,
migrationFrom, tms, bounds.left.getToken());
+ }
+
+ if (transactionalMode.accordIsEnabled)
+ {
+ if (!migrationFrom.isMigrating())
+ return true;
+ if (migrationFrom.migratingFromAccord())
+ return true;
+
+ // Accord is exclusive once the range is fully migrated to Accord,
but possible to read from safely
+ // when accordSafeToReadRanges covers the entire bound
+ if (intersects.test(bounds, tms.migratedRanges))
+ return true;
+ }
+ else
+ {
+ // Once the migration starts only barriers are allowed to run for
the key in Accord
+ if (migrationFrom.migratingFromAccord() &&
!intersects.test(bounds, tms.migratingAndMigratedRanges))
+ return true;
+ }
+
+ return false;
+ }
+
+ public enum RangeReadTarget
+ {
+ accord,
+ normal
+ }
+
+ public static class RangeReadWithTarget
+ {
+ public final PartitionRangeReadCommand read;
+ public final RangeReadTarget target;
+
+ private RangeReadWithTarget(PartitionRangeReadCommand read,
RangeReadTarget target)
+ {
+ this.read = read;
+ this.target = target;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RangeReadWithTarget{" +
+ "read=" + read +
+ ", target=" + target +
+ '}';
+ }
+ }
+
+ /**
+ * While it's possible to map the Accord read to a single txn it doesn't
seem worth it since it's a pretty unusual
+ * scenario where we do this during migration and have a lot of different
read commands.
+ */
+ public static List<RangeReadWithTarget>
splitReadIntoAccordAndNormal(ClusterMetadata cm, PartitionRangeReadCommand
read, ReadCoordinator readCoordinator, Dispatcher.RequestTime requestTime)
+ {
+ if (!readCoordinator.isEventuallyConsistent())
+ return ImmutableList.of(new RangeReadWithTarget(read,
RangeReadTarget.normal));
+ TableMetadata tm = getTableMetadata(cm, read.metadata().id);
+ if (tm == null ||
(!tm.params.transactionalMode.nonSerialReadsThroughAccord &&
!tm.params.transactionalMigrationFrom.nonSerialReadsThroughAccord()))
+ return ImmutableList.of(new RangeReadWithTarget(read,
RangeReadTarget.normal));
+
+ List<RangeReadWithTarget> result = null;
+ TransactionalMode transactionalMode = tm.params.transactionalMode;
+ TransactionalMigrationFromMode transactionalMigrationFromMode =
tm.params.transactionalMigrationFrom;
+ boolean transactionalModeReadsThroughAccord =
transactionalMode.nonSerialReadsThroughAccord;
+ RangeReadTarget migrationToTarget =
transactionalModeReadsThroughAccord ? RangeReadTarget.accord :
RangeReadTarget.normal;
+ boolean migrationFromReadsThroughAccord =
transactionalMigrationFromMode.nonSerialReadsThroughAccord();
+ RangeReadTarget migrationFromTarget = migrationFromReadsThroughAccord
? RangeReadTarget.accord : RangeReadTarget.normal;
+ TableMigrationState tms =
cm.consensusMigrationState.tableStates.get(tm.id);
+ if (tms == null)
+ {
+ if (transactionalMigrationFromMode ==
TransactionalMigrationFromMode.none)
+ // There is no migration and no TMS so do what the schema says
since no migration should be required
+ return ImmutableList.of(new RangeReadWithTarget(read,
transactionalModeReadsThroughAccord ? RangeReadTarget.accord :
RangeReadTarget.normal));
+ else
+ // If we are migrating from something and there is no
migration state the migration hasn't begun
+ // so continue to do what we are migrating from does until the
range is marked as migrating
+ return ImmutableList.of(new RangeReadWithTarget(read,
migrationFromReadsThroughAccord ? RangeReadTarget.accord :
RangeReadTarget.normal));
+ }
+
+
+ // AbstractBounds can potentially be left/right inclusive while Range
used to track migration is only right inclusive
+ // The right way to tackle this seems to be to find the tokens that
intersect the key range and then split until
+ // until nothing intersects
+ AbstractBounds<PartitionPosition> keyRange =
read.dataRange().keyRange();
+ AbstractBounds<PartitionPosition> remainder = keyRange;
+
+ // Migrating to Accord we only read through Accord when the range is
fully migrated, but migrating back
+ // we stop reading from Accord as soon as the range is marked
migrating and do key migration on read
+ NormalizedRanges<Token> migratedRanges =
transactionalModeReadsThroughAccord ? tms.migratedRanges :
tms.migratingAndMigratedRanges;
+
+ // Add the preceding range if any
+ if (!migratedRanges.isEmpty())
+ {
+ Token firstMigratingToken = migratedRanges.get(0).left.getToken();
+ int leftCmp =
keyRange.left.getToken().compareTo(firstMigratingToken);
+ int rightCmp = compareRightToken(keyRange.right.getToken(),
firstMigratingToken);
+ if (leftCmp <= 0)
+ {
+ if (rightCmp <= 0)
+ return ImmutableList.of(new RangeReadWithTarget(read,
migrationFromTarget));
+ result = new ArrayList<>();
+ AbstractBounds<PartitionPosition> precedingRange =
keyRange.withNewRight(rightCmp <= 0 ? keyRange.right :
firstMigratingToken.maxKeyBound());
+ // Could be an empty bound, it's fine to let a min KeyBound
and max KeyBound through as that isn't empty
+ if (!precedingRange.left.equals(precedingRange.right))
+ result.add(new
RangeReadWithTarget(read.forSubRange(precedingRange, false),
migrationFromTarget));
+ }
+ }
+
+ boolean hadAccordReads = false;
+ for (Range<Token> r : migratedRanges)
+ {
+ Pair<AbstractBounds<PartitionPosition>,
AbstractBounds<PartitionPosition>> intersectionAndRemainder =
Range.intersectionAndRemainder(remainder, r);
+ if (intersectionAndRemainder.left != null)
+ {
+ if (result == null)
+ result = new ArrayList<>();
+ PartitionRangeReadCommand subRead =
read.forSubRange(intersectionAndRemainder.left, result.isEmpty() ? true :
false);
+ result.add(new RangeReadWithTarget(subRead,
migrationToTarget));
+ hadAccordReads = true;
+ }
+ remainder = intersectionAndRemainder.right;
+ if (remainder == null)
+ break;
+ }
+
+ if (remainder != null)
+ {
+ if (result != null)
+ result.add(new RangeReadWithTarget(read.forSubRange(remainder,
true), migrationFromTarget));
+ else
+ return ImmutableList.of(new
RangeReadWithTarget(read.forSubRange(remainder, false), migrationFromTarget));
+ }
+
+ checkState(result != null && !result.isEmpty(), "Shouldn't have null
or empty result");
+
checkState(result.get(0).read.dataRange().startKey().equals(read.dataRange().startKey()),
"Split reads should encompass entire range");
+ checkState(result.get(result.size() -
1).read.dataRange().stopKey().equals(read.dataRange().stopKey()), "Split reads
should encompass entire range");
+ if (result.size() > 1)
+ {
+ for (int i = 0; i < result.size() - 1; i++)
+ {
+
checkState(result.get(i).read.dataRange().stopKey().equals(result.get(i +
1).read.dataRange().startKey()), "Split reads should all be adjacent");
+ checkState(result.get(i).target != result.get(i + 1).target,
"Split reads should be for different targets");
+ }
+ }
+
+ //TODO (later): The range reads need a barrier for now only going to
provide READ_COMMITTED
Review Comment:
wondering how we should better expose this? a `TODO` doesn't seem like the
nicest place
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]