dcapwell commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915421597


##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java:
##########
@@ -350,21 +351,19 @@ public static boolean 
tokenShouldBeWrittenThroughAccord(@Nonnull ClusterMetadata
             // with different results if Accord reads non-transactionally 
written data that could be seen differently by different coordinators
 
             // If the current mode writes through Accord then we should always 
write though Accord for ranges managed by Accord.
-            // Accord needs to do synchronous commit and respect the 
consistency level so that Accord will later be able to
-            // read its own writes
+            // Accord needs to do synchronous commit and respect the 
consistency level so non-SERIAL reads can read Accord's
+            // writes.
             if (transactionalModeWritesThroughAccord)
+            {
                 return tms.migratingAndMigratedRanges.intersects(token);
+            }
 
-            // If we are migrating from a mode that used to write to Accord 
then any range that isn't migrating/migrated
-            // should continue to write through Accord.
-            // It's not completely symmetrical because Paxos is able to read 
Accord's writes by performing a single key barrier
-            // and regular mutations will be able to do the same thing (needs 
to be added along with non-transactional reads)
-            // This means that migrating ranges don't need to be written 
through Accord because we are running Paxos now
-            // and not Accord. When migrating to Accord we need to do all the 
writes through Accord even if we aren't
-            // reading through Accord so that repair + Accord metadata is 
sufficient for Accord to be able to read
-            // safely and deterministically from any coordinator
+            // If we are migrating from a mode that used to write to Accord 
then any range that isn't migrated
+            // should continue to write through Accord. Accord might still be 
executing txns pre-migration so continue
+            // to route writes through Accord until migration is completed.
             if (migrationFromWritesThroughAccord)
-                return !tms.migratingAndMigratedRanges.intersects(token);
+//                return !tms.migratingAndMigratedRanges.intersects(token);

Review Comment:
   remove dead code



##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordWriteInteroperabilityTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nonnull;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.apache.commons.collections.ListUtils.synchronizedList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class AccordWriteInteroperabilityTest extends AccordTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordInteroperabilityTest.class);
+
+    @Nonnull
+    private final TransactionalMode mode;
+
+    private final boolean migrated;
+
+    public AccordWriteInteroperabilityTest(@Nonnull TransactionalMode mode, 
boolean migrated)
+    {
+        this.mode = mode;
+        this.migrated = migrated;
+    }
+
+    @Parameterized.Parameters(name = "transactionalMode={0}, migrated={1}")
+    public static Collection<Object[]> data() {
+        List<Object[]> tests = new 
ArrayList<>(TransactionalMode.values().length * 2);
+        for (TransactionalMode mode : TransactionalMode.values())
+        {
+            if (mode.accordIsEnabled)
+            {
+                tests.add(new Object[]{ mode, true });
+                tests.add(new Object[]{ mode, false });
+            }
+        }
+        return tests;
+    }
+
+    @Override
+    protected Logger logger()
+    {
+        return logger;
+    }
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AccordTestBase.setupCluster(builder -> builder.withConfig(config -> 
config.set("accord.range_migration", "auto")
+                                                                               
   .set("paxos_variant", "v2")),
+                                    3);
+    }
+
+    @After
+    public void tearDown()
+    {
+        SHARED_CLUSTER.setMessageSink(null);
+    }
+
+
+    private String testTransactionInsert()
+    {
+        return "BEGIN TRANSACTION\n" +
+               "  INSERT INTO " + qualifiedAccordTableName + " (k, c, v) 
VALUES (42, 2, 3);\n" +
+               "COMMIT TRANSACTION";
+    }
+
+    private String testInsert()
+    {
+        return "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES 
(42, 2, 3)";
+    }
+
+    private String testBatchInsert()
+    {
+        return "BEGIN BATCH\n" +
+               "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES 
(1, 2, 3);\n" +
+               "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES 
(42, 43, 44);\n" +
+               "APPLY BATCH";
+    }
+
+    @Test
+    public void testTransactionStatementApplyIsInteropApply() throws Throwable
+    {
+        testApplyIsInteropApply(testTransactionInsert());
+    }
+
+    @Test
+    public void testNonSerialApplyIsInteropApply() throws Throwable
+    {
+        testApplyIsInteropApply(testInsert());
+    }
+
+    @Test
+    public void testBatchInsertApplyIsInteropApply() throws Throwable
+    {
+        testApplyIsInteropApply(testBatchInsert());
+    }
+
+    private void testApplyIsInteropApply(String query) throws Throwable
+    {
+        test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v 
int, PRIMARY KEY(k, c))" + (migrated ? " WITH " + 
transactionalMode.asCqlParam() : ""),
+             cluster -> {
+                 MessageCountingSink messageCountingSink = new 
MessageCountingSink(SHARED_CLUSTER);
+                 List<String> failures = synchronizedList(new ArrayList<>());
+                 // Verify that the apply response is only sent after the row 
has been inserted
+                 // TODO (required): Need to delay mutation stage/mutation to 
ensure this has time to catch it
+                 SHARED_CLUSTER.setMessageSink((to, message) -> {
+                     try
+                     {
+                         if (message.verb() == Verb.ACCORD_APPLY_RSP.id)
+                         {
+                             // It can be async if it's migrated
+                             if (migrated)
+                                 return;
+                             String currentThread = 
Thread.currentThread().getName();
+                             char nodeIndexChar = currentThread.charAt(4);
+                             int nodeIndex = 
Integer.parseInt(String.valueOf(nodeIndexChar));

Review Comment:
   this can fail in some cases leading to this test being flakey.  You can also 
pull this out from the class loader.  You would be 
`org.apache.cassandra.distributed.shared.InstanceClassLoader` which has 
`org.apache.cassandra.distributed.shared.InstanceClassLoader#getInstanceId`
   
   
   
   You can also query 
`org.apache.cassandra.concurrent.NamedThreadFactory#globalPrefix` which is what 
"should" be the prefix...
   
   both the cases above should be more stable than the thread name



##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -187,6 +199,50 @@ static int computeConcurrencyFactor(int totalRangeCount, 
int rangesQueried, int
         return concurrencyFactor;
     }
 
+    private PartitionIterator executeAccord(ClusterMetadata cm, 
PartitionRangeReadCommand rangeCommand, ConsistencyLevel cl)
+    {
+        //TODO (nicetohave): This is very inefficient because it will not map 
to the command store owned ranges
+        // so every command store will return results up to the limit and most 
could be discarded.
+        // Really we want to split the ranges by command stores owned ranges 
and then query one at a time
+        // For this to work well it really needs to integrated upwards where 
the ranges to query are being picked
+        AsyncTxnResult result = StorageProxy.readWithAccord(cm, rangeCommand, 
ImmutableList.of(rangeCommand.dataRange().keyRange()), cl, requestTime);

Review Comment:
   We spoke about this in slack, putting here for history.
   
   command stores will split the request into by each store and only run on the 
stores that intersect... this leads to the case where each store could return 
at-most `limit` rows, so you need to post process to limit again... Ariel 
argues you can run against each store sequentially and return early (though 
would likely cause timeouts), but i argue that isn't a safe assumption... 
queries like the following could be impacted
   
   
   ```
   select token(pk), pk, ck, value from ks.tbl where pk in (0, 1) order by ck 
desc limit 1;
   ```
   
   



##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java:
##########
@@ -340,4 +512,410 @@ private static ConsensusRoutingDecision pickPaxos()
     {
         return Paxos.useV2() ? paxosV2 : paxosV1;
     }
+
+    public static void validateSafeToReadNonTransactionally(ReadCommand 
command)
+    {
+        if (command.allowsPotentialTxnConflicts())
+            return;
+
+        String keyspace = command.metadata().keyspace;
+        // System keyspaces are never managed by Accord
+        if (SchemaConstants.isSystemKeyspace(keyspace))
+            return;
+
+        // Local keyspaces are never managed by Accord
+        if (Schema.instance.localKeyspaces().containsKeyspace(keyspace))
+            return;
+
+        ClusterMetadata cm = ClusterMetadata.current();
+        TableId tableId = command.metadata().id;
+        TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+        // Null for local tables
+        if (tableMetadata == null)

Review Comment:
   local or dropped tables



##########
src/java/org/apache/cassandra/service/accord/txn/TxnRead.java:
##########
@@ -19,75 +19,271 @@
 package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import com.google.common.collect.ImmutableList;
+
+import accord.api.Data;
+import accord.api.DataStore;
 import accord.api.Read;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.ObjectSizes;
 
-import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
+import static 
org.apache.cassandra.service.accord.IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS;
+import static 
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.CAS_READ;
+import static 
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.USER;
+import static org.apache.cassandra.service.accord.txn.TxnData.txnDataName;
+import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
+import static 
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static 
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
-public interface TxnRead extends Read
+public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
 {
-    ConsistencyLevel cassandraConsistencyLevel();
+    public static final TxnRead EMPTY = new TxnRead(new TxnNamedRead[0], null);
+    private static final long EMPTY_SIZE = ObjectSizes.measure(EMPTY);
+    private static final Comparator<TxnNamedRead> 
TXN_NAMED_READ_KEY_COMPARATOR = Comparator.comparing(a -> ((PartitionKey) 
a.keys().get(0)));
+    private static final Comparator<TxnNamedRead> 
TXN_NAMED_READ_RANGE_COMPARATOR = Comparator.comparing(a -> ((TokenRange) 
a.keys()).start());

Review Comment:
   this isn't safe right?  if you have 2 ranges with the same start then 
`Collection.sort` can blow up with `TimSort` errors as the order can be 
non-deterministic... don't we also need to check `end` in case start conflicts?



##########
test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java:
##########
@@ -147,7 +147,7 @@ private static boolean allowsMigration(TransactionalMode 
mode)
     {
         switch (mode)
         {
-            case unsafe_writes:
+            case test_unsafe_writes:

Review Comment:
   actually shouldn't we just remove?  The test wants to really only test prod 
modes



##########
src/java/org/apache/cassandra/service/accord/txn/TxnRead.java:
##########
@@ -19,75 +19,271 @@
 package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import com.google.common.collect.ImmutableList;
+
+import accord.api.Data;
+import accord.api.DataStore;
 import accord.api.Read;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.ObjectSizes;
 
-import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
+import static 
org.apache.cassandra.service.accord.IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS;
+import static 
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.CAS_READ;
+import static 
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.USER;
+import static org.apache.cassandra.service.accord.txn.TxnData.txnDataName;
+import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
+import static 
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static 
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
-public interface TxnRead extends Read
+public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
 {
-    ConsistencyLevel cassandraConsistencyLevel();
+    public static final TxnRead EMPTY = new TxnRead(new TxnNamedRead[0], null);
+    private static final long EMPTY_SIZE = ObjectSizes.measure(EMPTY);
+    private static final Comparator<TxnNamedRead> 
TXN_NAMED_READ_KEY_COMPARATOR = Comparator.comparing(a -> ((PartitionKey) 
a.keys().get(0)));
+    private static final Comparator<TxnNamedRead> 
TXN_NAMED_READ_RANGE_COMPARATOR = Comparator.comparing(a -> ((TokenRange) 
a.keys()).start());
 
-    public interface TxnReadSerializer<T extends TxnRead> extends 
IVersionedSerializer<T> {}
+    // Cassandra's consistency level used by Accord to safely read data 
written outside of Accord
+    @Nullable
+    private final ConsistencyLevel cassandraConsistencyLevel;
 
-    enum Kind
+    private TxnRead(@Nonnull TxnNamedRead[] items, @Nullable ConsistencyLevel 
cassandraConsistencyLevel)
     {
-        key(0),
-        range(1);
+        super(items);
+        checkNotNull(items, "items is null");
+        checkArgument(cassandraConsistencyLevel == null || 
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel), 
"Unsupported consistency level for read");

Review Comment:
   ```suggestion
           checkArgument(cassandraConsistencyLevel == null || 
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel), 
"Unsupported consistency level for read: %s", cassandraConsistencyLevel);
   ```



##########
src/java/org/apache/cassandra/service/accord/txn/TxnRead.java:
##########
@@ -19,75 +19,271 @@
 package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import com.google.common.collect.ImmutableList;
+
+import accord.api.Data;
+import accord.api.DataStore;
 import accord.api.Read;
+import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Routable.Domain;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.ObjectSizes;
 
-import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
+import static 
org.apache.cassandra.service.accord.IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS;
+import static 
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.CAS_READ;
+import static 
org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind.USER;
+import static org.apache.cassandra.service.accord.txn.TxnData.txnDataName;
+import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
+import static 
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static 
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
-public interface TxnRead extends Read
+public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
 {
-    ConsistencyLevel cassandraConsistencyLevel();
+    public static final TxnRead EMPTY = new TxnRead(new TxnNamedRead[0], null);
+    private static final long EMPTY_SIZE = ObjectSizes.measure(EMPTY);
+    private static final Comparator<TxnNamedRead> 
TXN_NAMED_READ_KEY_COMPARATOR = Comparator.comparing(a -> ((PartitionKey) 
a.keys().get(0)));
+    private static final Comparator<TxnNamedRead> 
TXN_NAMED_READ_RANGE_COMPARATOR = Comparator.comparing(a -> ((TokenRange) 
a.keys()).start());
 
-    public interface TxnReadSerializer<T extends TxnRead> extends 
IVersionedSerializer<T> {}
+    // Cassandra's consistency level used by Accord to safely read data 
written outside of Accord
+    @Nullable
+    private final ConsistencyLevel cassandraConsistencyLevel;
 
-    enum Kind
+    private TxnRead(@Nonnull TxnNamedRead[] items, @Nullable ConsistencyLevel 
cassandraConsistencyLevel)
     {
-        key(0),
-        range(1);
+        super(items);
+        checkNotNull(items, "items is null");
+        checkArgument(cassandraConsistencyLevel == null || 
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel), 
"Unsupported consistency level for read");
+        this.cassandraConsistencyLevel = cassandraConsistencyLevel;
+    }
 
-        int id;
+    private TxnRead(@Nonnull List<TxnNamedRead> items, @Nullable 
ConsistencyLevel cassandraConsistencyLevel)
+    {
+        super(items);
+        checkNotNull(items, "items is null");
+        checkArgument(cassandraConsistencyLevel == null || 
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel), 
"Unsupported consistency level for read");

Review Comment:
   ```suggestion
           checkArgument(cassandraConsistencyLevel == null || 
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel), 
"Unsupported consistency level for read: %s", cassandraConsistencyLevel);
   ```



##########
src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java:
##########
@@ -156,14 +156,15 @@ private UnfilteredPartitionIterator 
executeReadCommand(ReadCommand cmd, Replica
 
         ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler = 
new ReadCallback<>(resolver, cmd, replicaPlan, requestTime);
 
-        if (source.isSelf())
+        if (source.isSelf() && coordinator.localReadSupported())

Review Comment:
   for me: only `false` here 
`org.apache.cassandra.service.accord.interop.AccordInteropExecution#localReadSupported`



##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -283,6 +382,77 @@ PartitionIterator sendNextRequests()
         return 
counter.applyTo(StorageProxy.concatAndBlockOnRepair(concurrentQueries, 
readRepairs));
     }
 
+    // Wrap the iterator to retry if request routing is incorrect
+    private PartitionIterator 
retryingPartitionIterator(Function<ClusterMetadata, PartitionIterator> attempt, 
ConsistencyLevel cl)
+    {
+        return new PartitionIterator()
+        {
+            private ClusterMetadata lastClusterMetadata = 
ClusterMetadata.current();
+            private PartitionIterator delegate = 
attempt.apply(lastClusterMetadata);
+
+            @Override
+            public void close()
+            {
+                delegate.close();
+            }
+
+            @Override
+            public boolean hasNext()
+            {
+                while (true)
+                {
+                    try
+                    {
+                        return delegate.hasNext();
+                    }
+                    catch (RetryOnDifferentSystemException e)
+                    {
+                        readMetrics.retryDifferentSystem.mark();
+                        readMetricsForLevel(cl).retryDifferentSystem.mark();
+                        logger.debug("Retrying range read on different system 
because some reads were misrouted according to Accord");
+                        Tracing.trace("Got {} from range reads, will retry", 
e);
+                    }
+                    catch (CoordinatorBehindException e)
+                    {
+                        readMetrics.retryCoordinatorBehind.mark();
+                        readMetricsForLevel(cl).retryCoordinatorBehind.mark();
+                        logger.debug("Retrying range read now that coordinator 
has caught up to cluster metadata");
+                        Tracing.trace("Got {} from range reads, will retry", 
e);
+                    }
+                    // Fetch the next epoch to retry
+                    long timeout = requestTime.computeTimeout(nanoTime(), 
DatabaseDescriptor.getRangeRpcTimeout(NANOSECONDS));
+                    Optional<Epoch> pending = 
ClusterMetadataService.instance().log().highestPending();
+                    logger.debug("Pending highest epoch is {}", pending);
+                    pending.ifPresent(epoch ->
+                           {
+                               try
+                               {
+                                   
ClusterMetadataService.instance().awaitAtLeast(epoch, timeout, NANOSECONDS);
+                               }
+                               catch (InterruptedException e)
+                               {
+                                   throw new RuntimeException(e);

Review Comment:
   ```suggestion
                                      Thread.currentThread().interrupt();
                                      throw new 
UncheckedInterruptedException(e);
   ```



##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordWriteInteroperabilityTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nonnull;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.apache.commons.collections.ListUtils.synchronizedList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class AccordWriteInteroperabilityTest extends AccordTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordInteroperabilityTest.class);
+
+    @Nonnull
+    private final TransactionalMode mode;
+
+    private final boolean migrated;
+
+    public AccordWriteInteroperabilityTest(@Nonnull TransactionalMode mode, 
boolean migrated)
+    {
+        this.mode = mode;
+        this.migrated = migrated;
+    }
+
+    @Parameterized.Parameters(name = "transactionalMode={0}, migrated={1}")
+    public static Collection<Object[]> data() {
+        List<Object[]> tests = new 
ArrayList<>(TransactionalMode.values().length * 2);
+        for (TransactionalMode mode : TransactionalMode.values())
+        {
+            if (mode.accordIsEnabled)
+            {
+                tests.add(new Object[]{ mode, true });
+                tests.add(new Object[]{ mode, false });
+            }
+        }
+        return tests;
+    }
+
+    @Override
+    protected Logger logger()
+    {
+        return logger;
+    }
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AccordTestBase.setupCluster(builder -> builder.withConfig(config -> 
config.set("accord.range_migration", "auto")
+                                                                               
   .set("paxos_variant", "v2")),
+                                    3);
+    }
+
+    @After
+    public void tearDown()
+    {
+        SHARED_CLUSTER.setMessageSink(null);
+    }
+
+
+    private String testTransactionInsert()
+    {
+        return "BEGIN TRANSACTION\n" +
+               "  INSERT INTO " + qualifiedAccordTableName + " (k, c, v) 
VALUES (42, 2, 3);\n" +
+               "COMMIT TRANSACTION";
+    }
+
+    private String testInsert()
+    {
+        return "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES 
(42, 2, 3)";
+    }
+
+    private String testBatchInsert()
+    {
+        return "BEGIN BATCH\n" +
+               "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES 
(1, 2, 3);\n" +
+               "INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES 
(42, 43, 44);\n" +
+               "APPLY BATCH";
+    }
+
+    @Test
+    public void testTransactionStatementApplyIsInteropApply() throws Throwable
+    {
+        testApplyIsInteropApply(testTransactionInsert());
+    }
+
+    @Test
+    public void testNonSerialApplyIsInteropApply() throws Throwable
+    {
+        testApplyIsInteropApply(testInsert());
+    }
+
+    @Test
+    public void testBatchInsertApplyIsInteropApply() throws Throwable
+    {
+        testApplyIsInteropApply(testBatchInsert());
+    }
+
+    private void testApplyIsInteropApply(String query) throws Throwable
+    {
+        test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v 
int, PRIMARY KEY(k, c))" + (migrated ? " WITH " + 
transactionalMode.asCqlParam() : ""),
+             cluster -> {
+                 MessageCountingSink messageCountingSink = new 
MessageCountingSink(SHARED_CLUSTER);
+                 List<String> failures = synchronizedList(new ArrayList<>());
+                 // Verify that the apply response is only sent after the row 
has been inserted
+                 // TODO (required): Need to delay mutation stage/mutation to 
ensure this has time to catch it
+                 SHARED_CLUSTER.setMessageSink((to, message) -> {
+                     try
+                     {
+                         if (message.verb() == Verb.ACCORD_APPLY_RSP.id)
+                         {
+                             // It can be async if it's migrated
+                             if (migrated)
+                                 return;
+                             String currentThread = 
Thread.currentThread().getName();
+                             char nodeIndexChar = currentThread.charAt(4);
+                             int nodeIndex = 
Integer.parseInt(String.valueOf(nodeIndexChar));
+                             try
+                             {
+                                 String keyspace = KEYSPACE;
+                                 String tableName = accordTableName;
+                                 String fail = 
SHARED_CLUSTER.get(nodeIndex).callOnInstance(() -> {
+                                     ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(keyspace, tableName);
+                                     Memtable memtable = 
cfs.getCurrentMemtable();
+                                     int expectedPartitions = 
query.startsWith("BEGIN BATCH") ? 2 : 1;
+                                     assertEquals(expectedPartitions, 
memtable.partitionCount());
+                                     UnfilteredPartitionIterator partitions = 
memtable.partitionIterator(ColumnFilter.all(cfs.metadata()), 
DataRange.allData(cfs.getPartitioner()), SSTableReadsListener.NOOP_LISTENER);
+                                     assertTrue(partitions.hasNext());
+                                     for (int i = 0; i < expectedPartitions; 
i++)
+                                     {
+                                         UnfilteredRowIterator rows = 
partitions.next();
+                                         
assertTrue(rows.partitionKey().equals(dk(42)) || 
rows.partitionKey().equals(dk(1)));
+                                         assertTrue(rows.hasNext());
+                                         Row row = (Row)rows.next();
+                                         assertFalse(rows.hasNext());
+                                     }
+                                     assertFalse(partitions.hasNext());
+                                     return null;
+                                 });
+                                 if (fail != null)

Review Comment:
   this is always `null`... when you fail the call throws



##########
src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java:
##########
@@ -196,72 +252,115 @@ static int computeConcurrencyFactor(int totalRangeCount, 
int rangesQueried, int
      * {@code DataLimits}) may have "state" information and that state may 
only be valid for the first query (in
      * that it's the query that "continues" whatever we're previously queried).
      */
-    private IRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, 
ReadCoordinator readCoordinator, boolean isFirst)
+    private PartitionIterator query(ClusterMetadata cm, 
ReplicaPlan.ForRangeRead replicaPlan, ReadCoordinator readCoordinator, 
List<ReadRepair<?, ?>> readRepairs, boolean isFirst)
     {
         PartitionRangeReadCommand rangeCommand = 
command.forSubRange(replicaPlan.range(), isFirst);
-        
-        // If enabled, request repaired data tracking info from full replicas, 
but
-        // only if there are multiple full replicas to compare results from.
-        boolean trackRepairedStatus = 
DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
-                                      && 
replicaPlan.contacts().filter(Replica::isFull).size() > 1;
 
-        ClusterMetadata cm = ClusterMetadata.current();
-        TableMetadata metadata = command.metadata();
-        TableParams tableParams = metadata.params;
-        TransactionalMode transactionalMode = tableParams.transactionalMode;
-        TransactionalMigrationFromMode transactionalMigrationFromMode = 
tableParams.transactionalMigrationFrom;
-        if (transactionalMigrationFromMode != 
TransactionalMigrationFromMode.none && 
transactionalMode.nonSerialReadsThroughAccord && 
transactionalMigrationFromMode.nonSerialWritesThroughAccord() && 
transactionalMigrationFromMode.nonSerialReadsThroughAccord())
-            throw new UnsupportedOperationException("Live migration is not 
supported, can't safely read when migrating from " + 
transactionalMigrationFromMode + " to " + transactionalMode);
-        if (transactionalMode.nonSerialReadsThroughAccord && 
readCoordinator.isEventuallyConsistent())
+        // Accord interop execution should always be coordinated through the 
C* plumbing
+        if (!readCoordinator.isEventuallyConsistent())
         {
-            //TODO (nicetohave): This is very inefficient because it will not 
map the the command store owned ranges
-            // so every command store will return results and most will be 
discarded due to the limit
-            // Really we want to split the ranges by command stores owned 
ranges and then query one at a time
-            AsyncTxnResult result = StorageProxy.readWithAccord(cm, 
rangeCommand, ImmutableList.of(rangeCommand.dataRange().keyRange()), 
replicaPlan.consistencyLevel(), requestTime);
-            return new AccordRangeResponse(result, rangeCommand.isReversed(), 
replicaPlan.consistencyLevel(), requestTime);
+            SingleRangeResponse response = executeNormal(replicaPlan, 
rangeCommand, readCoordinator);
+            readRepairs.add(response.getReadRepair());
+            return response;
         }
-        else
+
+        List<RangeReadWithTarget> reads = 
ConsensusRequestRouter.splitReadIntoAccordAndNormal(cm, rangeCommand, 
readCoordinator, requestTime);
+        // Special case returning directly to avoid wrapping the iterator and 
applying the limits an extra time
+        if (reads.size() == 1)
         {
-            ReplicaPlan.SharedForRangeRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
-            ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair 
=
-            ReadRepair.create(ReadCoordinator.DEFAULT, command, 
sharedReplicaPlan, requestTime);
-            DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver 
=
-            new DataResolver<>(ReadCoordinator.DEFAULT, rangeCommand, 
sharedReplicaPlan, readRepair, requestTime, trackRepairedStatus);
-            ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
-            new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, 
requestTime);
-
-            if (replicaPlan.contacts().size() == 1 && 
replicaPlan.contacts().get(0).isSelf())
+            RangeReadWithTarget rangeReadWithTarget = reads.get(0);
+            
checkState(rangeReadWithTarget.read.dataRange().keyRange().equals(rangeCommand.dataRange().keyRange()));
+            if (rangeReadWithTarget.target == RangeReadTarget.accord && 
readCoordinator.isEventuallyConsistent())
             {
-                Stage.READ.execute(new 
StorageProxy.LocalReadRunnable(rangeCommand, handler, requestTime, 
trackRepairedStatus));
+                return executeAccord(cm,
+                                     rangeReadWithTarget.read,
+                                     replicaPlan.consistencyLevel());
             }
             else
             {
-                for (Replica replica : replicaPlan.contacts())
-                {
-                    Tracing.trace("Enqueuing request to {}", replica);
-                    ReadCommand command = replica.isFull() ? rangeCommand : 
rangeCommand.copyAsTransientQuery(replica);
-                    Message<ReadCommand> message = 
command.createMessage(trackRepairedStatus && replica.isFull(), requestTime);
-                    MessagingService.instance().sendWithCallback(message, 
replica.endpoint(), handler);
-                }
+                SingleRangeResponse response = executeNormal(replicaPlan, 
rangeReadWithTarget.read, readCoordinator);
+                readRepairs.add(response.getReadRepair());
+                return response;
             }
-            return new CassandraRangeResponse(resolver, handler, readRepair);
         }
+
+        // TODO (review): Should this be reworked to execute the queries 
serially from the iterator? It would respect

Review Comment:
   i would expect that to timeout more often no?



##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java:
##########
@@ -340,4 +512,410 @@ private static ConsensusRoutingDecision pickPaxos()
     {
         return Paxos.useV2() ? paxosV2 : paxosV1;
     }
+
+    public static void validateSafeToReadNonTransactionally(ReadCommand 
command)
+    {
+        if (command.allowsPotentialTxnConflicts())
+            return;
+
+        String keyspace = command.metadata().keyspace;
+        // System keyspaces are never managed by Accord
+        if (SchemaConstants.isSystemKeyspace(keyspace))
+            return;
+
+        // Local keyspaces are never managed by Accord
+        if (Schema.instance.localKeyspaces().containsKeyspace(keyspace))
+            return;
+
+        ClusterMetadata cm = ClusterMetadata.current();
+        TableId tableId = command.metadata().id;
+        TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+        // Null for local tables
+        if (tableMetadata == null)
+            return;
+
+        TransactionalMode transactionalMode = 
tableMetadata.params.transactionalMode;
+        TransactionalMigrationFromMode transactionalMigrationFromMode = 
tableMetadata.params.transactionalMigrationFrom;
+        if (!transactionalMode.nonSerialReadsThroughAccord && 
!transactionalMigrationFromMode.nonSerialReadsThroughAccord())
+            return;
+
+        TableMigrationState tms = 
cm.consensusMigrationState.tableStates.get(tableId);
+
+        // Null with a transaction mode that reads through Accord indicates a 
completed migration or table created
+        // to use Accord initially
+        if (tms == null)
+        {
+            checkState(transactionalMigrationFromMode == 
TransactionalMigrationFromMode.none);
+            if (transactionalMode.nonSerialReadsThroughAccord)
+            {
+                ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+                if (cfs != null)
+                    cfs.metric.readsRejectedOnWrongSystem.mark();
+                throw new RetryOnDifferentSystemException();
+            }
+        }
+
+        boolean isExclusivelyReadableFromAccord;
+        if (command.isRangeRequest())
+            isExclusivelyReadableFromAccord = 
isBoundsExclusivelyManagedByAccordForRead(transactionalMode, 
transactionalMigrationFromMode, tms, command.dataRange().keyRange());
+        else
+            isExclusivelyReadableFromAccord = 
isTokenExclusivelyManagedByAccordForRead(transactionalMode, 
transactionalMigrationFromMode, tms, 
((SinglePartitionReadCommand)command).partitionKey().getToken());
+
+        if (isExclusivelyReadableFromAccord)
+        {
+            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+            if (cfs != null)
+                cfs.metric.readsRejectedOnWrongSystem.mark();
+            throw new RetryOnDifferentSystemException();
+        }
+    }
+
+    private static boolean isTokenExclusivelyManagedByAccordForRead(@Nonnull 
TransactionalMode transactionalMode,
+                                                                     @Nonnull 
TransactionalMigrationFromMode migrationFrom,
+                                                                     @Nonnull 
TableMigrationState tms,
+                                                                     @Nonnull 
Token token)
+    {
+        checkNotNull(transactionalMode, "transactionalMode is null");
+        checkNotNull(migrationFrom, "migrationFrom is null");
+        checkNotNull(tms, "tms (TableMigrationState) is null");
+        checkNotNull(token, "bounds is null");
+
+        if (transactionalMode.accordIsEnabled)
+        {
+            if (!migrationFrom.isMigrating())
+                return true;
+            if (migrationFrom.migratingFromAccord())
+                return true;
+
+            // Accord is exclusive once the range is fully migrated to Accord, 
but possible to read from safely
+            // when accordSafeToReadRanges covers the entire bound
+            if (tms.migratedRanges.intersects(token))
+                return true;
+        }
+        else
+        {
+            // Once the migration starts only barriers are allowed to run for 
the key in Accord
+            if (migrationFrom.migratingFromAccord() && 
!tms.migratingAndMigratedRanges.intersects(token))
+                return true;
+        }
+
+        return false;
+    }
+
+    // Returns true if any part of the bound
+    private static boolean isBoundsExclusivelyManagedByAccordForRead(@Nonnull 
TransactionalMode transactionalMode,
+                                                                     @Nonnull 
TransactionalMigrationFromMode migrationFrom,
+                                                                     @Nonnull 
TableMigrationState tms,
+                                                                     @Nonnull 
AbstractBounds<PartitionPosition> bounds)
+    {
+        checkNotNull(transactionalMode, "transactionalMode is null");
+        checkNotNull(migrationFrom, "migrationFrom is null");
+        checkNotNull(tms, "tms (TableMigrationState) is null");
+        checkNotNull(bounds, "bounds is null");
+
+        BiPredicate<AbstractBounds<PartitionPosition>, 
NormalizedRanges<Token>> intersects = (testBounds, testRanges) -> {
+            // TODO (nicetohave): Efficiency of this intersection
+            for (org.apache.cassandra.dht.Range<Token> range : testRanges)
+            {
+                Pair<AbstractBounds<PartitionPosition>, 
AbstractBounds<PartitionPosition>> intersectionAndRemainder = 
Range.intersectionAndRemainder(testBounds, range);
+                return intersectionAndRemainder.left != null;
+            }
+            return false;
+        };
+
+        if (bounds.left.getToken().equals(bounds.right.getToken()) && 
!bounds.inclusiveLeft() && bounds.inclusiveRight())
+        {
+            return isTokenExclusivelyManagedByAccordForRead(transactionalMode, 
migrationFrom, tms, bounds.left.getToken());
+        }
+
+        if (transactionalMode.accordIsEnabled)
+        {
+            if (!migrationFrom.isMigrating())
+                return true;
+            if (migrationFrom.migratingFromAccord())
+                return true;
+
+            // Accord is exclusive once the range is fully migrated to Accord, 
but possible to read from safely
+            // when accordSafeToReadRanges covers the entire bound
+            if (intersects.test(bounds, tms.migratedRanges))
+                return true;
+        }
+        else
+        {
+            // Once the migration starts only barriers are allowed to run for 
the key in Accord
+            if (migrationFrom.migratingFromAccord() && 
!intersects.test(bounds, tms.migratingAndMigratedRanges))
+                return true;
+        }
+
+        return false;
+    }
+
+    public enum RangeReadTarget
+    {
+        accord,
+        normal
+    }
+
+    public static class RangeReadWithTarget
+    {
+        public final PartitionRangeReadCommand read;
+        public final RangeReadTarget target;
+
+        private RangeReadWithTarget(PartitionRangeReadCommand read, 
RangeReadTarget target)
+        {
+            this.read = read;
+            this.target = target;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "RangeReadWithTarget{" +
+                   "read=" + read +
+                   ", target=" + target +
+                   '}';
+        }
+    }
+
+    /**
+     * While it's possible to map the Accord read to a single txn it doesn't 
seem worth it since it's a pretty unusual
+     * scenario where we do this during migration and have a lot of different 
read commands.
+     */
+    public static List<RangeReadWithTarget> 
splitReadIntoAccordAndNormal(ClusterMetadata cm, PartitionRangeReadCommand 
read, ReadCoordinator readCoordinator, Dispatcher.RequestTime requestTime)
+    {
+        if (!readCoordinator.isEventuallyConsistent())
+            return ImmutableList.of(new RangeReadWithTarget(read, 
RangeReadTarget.normal));
+        TableMetadata tm = getTableMetadata(cm, read.metadata().id);
+        if (tm == null || 
(!tm.params.transactionalMode.nonSerialReadsThroughAccord && 
!tm.params.transactionalMigrationFrom.nonSerialReadsThroughAccord()))
+            return ImmutableList.of(new RangeReadWithTarget(read, 
RangeReadTarget.normal));
+
+        List<RangeReadWithTarget> result = null;
+        TransactionalMode transactionalMode = tm.params.transactionalMode;
+        TransactionalMigrationFromMode transactionalMigrationFromMode = 
tm.params.transactionalMigrationFrom;
+        boolean transactionalModeReadsThroughAccord = 
transactionalMode.nonSerialReadsThroughAccord;
+        RangeReadTarget migrationToTarget = 
transactionalModeReadsThroughAccord ? RangeReadTarget.accord : 
RangeReadTarget.normal;
+        boolean migrationFromReadsThroughAccord = 
transactionalMigrationFromMode.nonSerialReadsThroughAccord();
+        RangeReadTarget migrationFromTarget = migrationFromReadsThroughAccord 
? RangeReadTarget.accord : RangeReadTarget.normal;
+        TableMigrationState tms = 
cm.consensusMigrationState.tableStates.get(tm.id);
+        if (tms == null)
+        {
+            if (transactionalMigrationFromMode == 
TransactionalMigrationFromMode.none)
+                // There is no migration and no TMS so do what the schema says 
since no migration should be required
+                return ImmutableList.of(new RangeReadWithTarget(read, 
transactionalModeReadsThroughAccord ? RangeReadTarget.accord : 
RangeReadTarget.normal));
+            else
+                // If we are migrating from something and there is no 
migration state the migration hasn't begun
+                // so continue to do what we are migrating from does until the 
range is marked as migrating
+                return ImmutableList.of(new RangeReadWithTarget(read, 
migrationFromReadsThroughAccord ? RangeReadTarget.accord : 
RangeReadTarget.normal));
+        }
+
+
+        // AbstractBounds can potentially be left/right inclusive while Range 
used to track migration is only right inclusive
+        // The right way to tackle this seems to be to find the tokens that 
intersect the key range and then split until
+        // until nothing intersects
+        AbstractBounds<PartitionPosition> keyRange = 
read.dataRange().keyRange();
+        AbstractBounds<PartitionPosition> remainder = keyRange;
+
+        // Migrating to Accord we only read through Accord when the range is 
fully migrated, but migrating back
+        // we stop reading from Accord as soon as the range is marked 
migrating and do key migration on read
+        NormalizedRanges<Token> migratedRanges = 
transactionalModeReadsThroughAccord ? tms.migratedRanges : 
tms.migratingAndMigratedRanges;
+
+        // Add the preceding range if any
+        if (!migratedRanges.isEmpty())
+        {
+            Token firstMigratingToken = migratedRanges.get(0).left.getToken();
+            int leftCmp = 
keyRange.left.getToken().compareTo(firstMigratingToken);
+            int rightCmp = compareRightToken(keyRange.right.getToken(), 
firstMigratingToken);
+            if (leftCmp <= 0)
+            {
+                if (rightCmp <= 0)
+                    return ImmutableList.of(new RangeReadWithTarget(read, 
migrationFromTarget));
+                result = new ArrayList<>();
+                AbstractBounds<PartitionPosition> precedingRange = 
keyRange.withNewRight(rightCmp <= 0 ? keyRange.right : 
firstMigratingToken.maxKeyBound());
+                // Could be an empty bound, it's fine to let a min KeyBound 
and max KeyBound through as that isn't empty
+                if (!precedingRange.left.equals(precedingRange.right))
+                    result.add(new 
RangeReadWithTarget(read.forSubRange(precedingRange, false), 
migrationFromTarget));
+            }
+        }
+
+        boolean hadAccordReads = false;
+        for (Range<Token> r : migratedRanges)
+        {
+            Pair<AbstractBounds<PartitionPosition>, 
AbstractBounds<PartitionPosition>> intersectionAndRemainder = 
Range.intersectionAndRemainder(remainder, r);
+            if (intersectionAndRemainder.left != null)
+            {
+                if (result == null)
+                    result = new ArrayList<>();
+                PartitionRangeReadCommand subRead = 
read.forSubRange(intersectionAndRemainder.left, result.isEmpty() ? true : 
false);
+                result.add(new RangeReadWithTarget(subRead, 
migrationToTarget));
+                hadAccordReads = true;
+            }
+            remainder = intersectionAndRemainder.right;
+            if (remainder == null)
+                break;
+        }
+
+        if (remainder != null)
+        {
+            if (result != null)
+                result.add(new RangeReadWithTarget(read.forSubRange(remainder, 
true), migrationFromTarget));
+            else
+                return ImmutableList.of(new 
RangeReadWithTarget(read.forSubRange(remainder, false), migrationFromTarget));
+        }
+
+        checkState(result != null && !result.isEmpty(), "Shouldn't have null 
or empty result");
+        
checkState(result.get(0).read.dataRange().startKey().equals(read.dataRange().startKey()),
 "Split reads should encompass entire range");
+        checkState(result.get(result.size() - 
1).read.dataRange().stopKey().equals(read.dataRange().stopKey()), "Split reads 
should encompass entire range");
+        if (result.size() > 1)
+        {
+            for (int i = 0; i < result.size() - 1; i++)
+            {
+                
checkState(result.get(i).read.dataRange().stopKey().equals(result.get(i + 
1).read.dataRange().startKey()), "Split reads should all be adjacent");
+                checkState(result.get(i).target != result.get(i + 1).target, 
"Split reads should be for different targets");
+            }
+        }
+
+        //TODO (later): The range reads need a barrier for now only going to 
provide READ_COMMITTED

Review Comment:
   wondering how we should better expose this?  a `TODO` doesn't seem like the 
nicest place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to