aweisberg commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915648188


##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java:
##########
@@ -340,4 +512,410 @@ private static ConsensusRoutingDecision pickPaxos()
     {
         return Paxos.useV2() ? paxosV2 : paxosV1;
     }
+
+    public static void validateSafeToReadNonTransactionally(ReadCommand 
command)
+    {
+        if (command.allowsPotentialTxnConflicts())
+            return;
+
+        String keyspace = command.metadata().keyspace;
+        // System keyspaces are never managed by Accord
+        if (SchemaConstants.isSystemKeyspace(keyspace))
+            return;
+
+        // Local keyspaces are never managed by Accord
+        if (Schema.instance.localKeyspaces().containsKeyspace(keyspace))
+            return;
+
+        ClusterMetadata cm = ClusterMetadata.current();
+        TableId tableId = command.metadata().id;
+        TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+        // Null for local tables
+        if (tableMetadata == null)
+            return;
+
+        TransactionalMode transactionalMode = 
tableMetadata.params.transactionalMode;
+        TransactionalMigrationFromMode transactionalMigrationFromMode = 
tableMetadata.params.transactionalMigrationFrom;
+        if (!transactionalMode.nonSerialReadsThroughAccord && 
!transactionalMigrationFromMode.nonSerialReadsThroughAccord())
+            return;
+
+        TableMigrationState tms = 
cm.consensusMigrationState.tableStates.get(tableId);
+
+        // Null with a transaction mode that reads through Accord indicates a 
completed migration or table created
+        // to use Accord initially
+        if (tms == null)
+        {
+            checkState(transactionalMigrationFromMode == 
TransactionalMigrationFromMode.none);
+            if (transactionalMode.nonSerialReadsThroughAccord)
+            {
+                ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+                if (cfs != null)
+                    cfs.metric.readsRejectedOnWrongSystem.mark();
+                throw new RetryOnDifferentSystemException();
+            }
+        }
+
+        boolean isExclusivelyReadableFromAccord;
+        if (command.isRangeRequest())
+            isExclusivelyReadableFromAccord = 
isBoundsExclusivelyManagedByAccordForRead(transactionalMode, 
transactionalMigrationFromMode, tms, command.dataRange().keyRange());
+        else
+            isExclusivelyReadableFromAccord = 
isTokenExclusivelyManagedByAccordForRead(transactionalMode, 
transactionalMigrationFromMode, tms, 
((SinglePartitionReadCommand)command).partitionKey().getToken());
+
+        if (isExclusivelyReadableFromAccord)
+        {
+            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+            if (cfs != null)
+                cfs.metric.readsRejectedOnWrongSystem.mark();
+            throw new RetryOnDifferentSystemException();
+        }
+    }
+
+    private static boolean isTokenExclusivelyManagedByAccordForRead(@Nonnull 
TransactionalMode transactionalMode,
+                                                                     @Nonnull 
TransactionalMigrationFromMode migrationFrom,
+                                                                     @Nonnull 
TableMigrationState tms,
+                                                                     @Nonnull 
Token token)
+    {
+        checkNotNull(transactionalMode, "transactionalMode is null");
+        checkNotNull(migrationFrom, "migrationFrom is null");
+        checkNotNull(tms, "tms (TableMigrationState) is null");
+        checkNotNull(token, "bounds is null");
+
+        if (transactionalMode.accordIsEnabled)
+        {
+            if (!migrationFrom.isMigrating())
+                return true;
+            if (migrationFrom.migratingFromAccord())
+                return true;
+
+            // Accord is exclusive once the range is fully migrated to Accord, 
but possible to read from safely
+            // when accordSafeToReadRanges covers the entire bound
+            if (tms.migratedRanges.intersects(token))
+                return true;
+        }
+        else
+        {
+            // Once the migration starts only barriers are allowed to run for 
the key in Accord
+            if (migrationFrom.migratingFromAccord() && 
!tms.migratingAndMigratedRanges.intersects(token))
+                return true;
+        }
+
+        return false;
+    }
+
+    // Returns true if any part of the bound
+    private static boolean isBoundsExclusivelyManagedByAccordForRead(@Nonnull 
TransactionalMode transactionalMode,
+                                                                     @Nonnull 
TransactionalMigrationFromMode migrationFrom,
+                                                                     @Nonnull 
TableMigrationState tms,
+                                                                     @Nonnull 
AbstractBounds<PartitionPosition> bounds)
+    {
+        checkNotNull(transactionalMode, "transactionalMode is null");
+        checkNotNull(migrationFrom, "migrationFrom is null");
+        checkNotNull(tms, "tms (TableMigrationState) is null");
+        checkNotNull(bounds, "bounds is null");
+
+        BiPredicate<AbstractBounds<PartitionPosition>, 
NormalizedRanges<Token>> intersects = (testBounds, testRanges) -> {
+            // TODO (nicetohave): Efficiency of this intersection
+            for (org.apache.cassandra.dht.Range<Token> range : testRanges)
+            {
+                Pair<AbstractBounds<PartitionPosition>, 
AbstractBounds<PartitionPosition>> intersectionAndRemainder = 
Range.intersectionAndRemainder(testBounds, range);
+                return intersectionAndRemainder.left != null;
+            }
+            return false;
+        };
+
+        if (bounds.left.getToken().equals(bounds.right.getToken()) && 
!bounds.inclusiveLeft() && bounds.inclusiveRight())
+        {
+            return isTokenExclusivelyManagedByAccordForRead(transactionalMode, 
migrationFrom, tms, bounds.left.getToken());
+        }
+
+        if (transactionalMode.accordIsEnabled)
+        {
+            if (!migrationFrom.isMigrating())
+                return true;
+            if (migrationFrom.migratingFromAccord())
+                return true;
+
+            // Accord is exclusive once the range is fully migrated to Accord, 
but possible to read from safely
+            // when accordSafeToReadRanges covers the entire bound
+            if (intersects.test(bounds, tms.migratedRanges))
+                return true;
+        }
+        else
+        {
+            // Once the migration starts only barriers are allowed to run for 
the key in Accord
+            if (migrationFrom.migratingFromAccord() && 
!intersects.test(bounds, tms.migratingAndMigratedRanges))
+                return true;
+        }
+
+        return false;
+    }
+
+    public enum RangeReadTarget
+    {
+        accord,
+        normal
+    }
+
+    public static class RangeReadWithTarget
+    {
+        public final PartitionRangeReadCommand read;
+        public final RangeReadTarget target;
+
+        private RangeReadWithTarget(PartitionRangeReadCommand read, 
RangeReadTarget target)
+        {
+            this.read = read;
+            this.target = target;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "RangeReadWithTarget{" +
+                   "read=" + read +
+                   ", target=" + target +
+                   '}';
+        }
+    }
+
+    /**
+     * While it's possible to map the Accord read to a single txn it doesn't 
seem worth it since it's a pretty unusual
+     * scenario where we do this during migration and have a lot of different 
read commands.
+     */
+    public static List<RangeReadWithTarget> 
splitReadIntoAccordAndNormal(ClusterMetadata cm, PartitionRangeReadCommand 
read, ReadCoordinator readCoordinator, Dispatcher.RequestTime requestTime)
+    {
+        if (!readCoordinator.isEventuallyConsistent())
+            return ImmutableList.of(new RangeReadWithTarget(read, 
RangeReadTarget.normal));
+        TableMetadata tm = getTableMetadata(cm, read.metadata().id);
+        if (tm == null || 
(!tm.params.transactionalMode.nonSerialReadsThroughAccord && 
!tm.params.transactionalMigrationFrom.nonSerialReadsThroughAccord()))
+            return ImmutableList.of(new RangeReadWithTarget(read, 
RangeReadTarget.normal));
+
+        List<RangeReadWithTarget> result = null;
+        TransactionalMode transactionalMode = tm.params.transactionalMode;
+        TransactionalMigrationFromMode transactionalMigrationFromMode = 
tm.params.transactionalMigrationFrom;
+        boolean transactionalModeReadsThroughAccord = 
transactionalMode.nonSerialReadsThroughAccord;
+        RangeReadTarget migrationToTarget = 
transactionalModeReadsThroughAccord ? RangeReadTarget.accord : 
RangeReadTarget.normal;
+        boolean migrationFromReadsThroughAccord = 
transactionalMigrationFromMode.nonSerialReadsThroughAccord();
+        RangeReadTarget migrationFromTarget = migrationFromReadsThroughAccord 
? RangeReadTarget.accord : RangeReadTarget.normal;
+        TableMigrationState tms = 
cm.consensusMigrationState.tableStates.get(tm.id);
+        if (tms == null)
+        {
+            if (transactionalMigrationFromMode == 
TransactionalMigrationFromMode.none)
+                // There is no migration and no TMS so do what the schema says 
since no migration should be required
+                return ImmutableList.of(new RangeReadWithTarget(read, 
transactionalModeReadsThroughAccord ? RangeReadTarget.accord : 
RangeReadTarget.normal));
+            else
+                // If we are migrating from something and there is no 
migration state the migration hasn't begun
+                // so continue to do what we are migrating from does until the 
range is marked as migrating
+                return ImmutableList.of(new RangeReadWithTarget(read, 
migrationFromReadsThroughAccord ? RangeReadTarget.accord : 
RangeReadTarget.normal));
+        }
+
+
+        // AbstractBounds can potentially be left/right inclusive while Range 
used to track migration is only right inclusive
+        // The right way to tackle this seems to be to find the tokens that 
intersect the key range and then split until
+        // until nothing intersects
+        AbstractBounds<PartitionPosition> keyRange = 
read.dataRange().keyRange();
+        AbstractBounds<PartitionPosition> remainder = keyRange;
+
+        // Migrating to Accord we only read through Accord when the range is 
fully migrated, but migrating back
+        // we stop reading from Accord as soon as the range is marked 
migrating and do key migration on read
+        NormalizedRanges<Token> migratedRanges = 
transactionalModeReadsThroughAccord ? tms.migratedRanges : 
tms.migratingAndMigratedRanges;
+
+        // Add the preceding range if any
+        if (!migratedRanges.isEmpty())
+        {
+            Token firstMigratingToken = migratedRanges.get(0).left.getToken();
+            int leftCmp = 
keyRange.left.getToken().compareTo(firstMigratingToken);
+            int rightCmp = compareRightToken(keyRange.right.getToken(), 
firstMigratingToken);
+            if (leftCmp <= 0)
+            {
+                if (rightCmp <= 0)
+                    return ImmutableList.of(new RangeReadWithTarget(read, 
migrationFromTarget));
+                result = new ArrayList<>();
+                AbstractBounds<PartitionPosition> precedingRange = 
keyRange.withNewRight(rightCmp <= 0 ? keyRange.right : 
firstMigratingToken.maxKeyBound());
+                // Could be an empty bound, it's fine to let a min KeyBound 
and max KeyBound through as that isn't empty
+                if (!precedingRange.left.equals(precedingRange.right))
+                    result.add(new 
RangeReadWithTarget(read.forSubRange(precedingRange, false), 
migrationFromTarget));
+            }
+        }
+
+        boolean hadAccordReads = false;
+        for (Range<Token> r : migratedRanges)
+        {
+            Pair<AbstractBounds<PartitionPosition>, 
AbstractBounds<PartitionPosition>> intersectionAndRemainder = 
Range.intersectionAndRemainder(remainder, r);
+            if (intersectionAndRemainder.left != null)
+            {
+                if (result == null)
+                    result = new ArrayList<>();
+                PartitionRangeReadCommand subRead = 
read.forSubRange(intersectionAndRemainder.left, result.isEmpty() ? true : 
false);
+                result.add(new RangeReadWithTarget(subRead, 
migrationToTarget));
+                hadAccordReads = true;
+            }
+            remainder = intersectionAndRemainder.right;
+            if (remainder == null)
+                break;
+        }
+
+        if (remainder != null)
+        {
+            if (result != null)
+                result.add(new RangeReadWithTarget(read.forSubRange(remainder, 
true), migrationFromTarget));
+            else
+                return ImmutableList.of(new 
RangeReadWithTarget(read.forSubRange(remainder, false), migrationFromTarget));
+        }
+
+        checkState(result != null && !result.isEmpty(), "Shouldn't have null 
or empty result");
+        
checkState(result.get(0).read.dataRange().startKey().equals(read.dataRange().startKey()),
 "Split reads should encompass entire range");
+        checkState(result.get(result.size() - 
1).read.dataRange().stopKey().equals(read.dataRange().stopKey()), "Split reads 
should encompass entire range");
+        if (result.size() > 1)
+        {
+            for (int i = 0; i < result.size() - 1; i++)
+            {
+                
checkState(result.get(i).read.dataRange().stopKey().equals(result.get(i + 
1).read.dataRange().startKey()), "Split reads should all be adjacent");
+                checkState(result.get(i).target != result.get(i + 1).target, 
"Split reads should be for different targets");
+            }
+        }
+
+        //TODO (later): The range reads need a barrier for now only going to 
provide READ_COMMITTED

Review Comment:
   I will create a JIRA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to