dcapwell commented on code in PR #3568:
URL: https://github.com/apache/cassandra/pull/3568#discussion_r1779062043


##########
src/java/org/apache/cassandra/service/consensus/TransactionalMode.java:
##########
@@ -112,14 +121,25 @@ public ConsistencyLevel 
commitCLForStrategy(ConsistencyLevel consistencyLevel)
         return consistencyLevel;
     }
 
-    // TODO (required): This won't work for migration directly from none to 
full because there is no safe system to read from
-    // during the first phase (repair). Accord won't read correctly beacuse it 
won't honor the CL and miss non-transactional writes that haven't been repaired 
and non-transactional
-    // reads will miss all the writes being routed through Accord since they 
occur asynchronously. Something has to give here where either writes routed 
through are Accord are synchronous at CL
-    // or reads are routed through Accord and read at quorum as long as the 
range has not completed the first phase (repair).
-    public ConsistencyLevel readCLForStrategy(ConsistencyLevel 
consistencyLevel)
+    private boolean ignoresSuppliedReadCL()
     {
-        if (ignoresSuppliedConsistencyLevel)
-            return null;
+        return writesThroughAccord && blockingReadRepairThroughAccord;

Review Comment:
   with this change do you still need `public final boolean 
ignoresSuppliedConsistencyLevel`?  Also wouldn't you also need reads to go 
through accord as well (aka, im just reading w/e is in the enum)



##########
src/java/org/apache/cassandra/service/consensus/TransactionalMode.java:
##########
@@ -112,14 +121,25 @@ public ConsistencyLevel 
commitCLForStrategy(ConsistencyLevel consistencyLevel)
         return consistencyLevel;
     }
 
-    // TODO (required): This won't work for migration directly from none to 
full because there is no safe system to read from
-    // during the first phase (repair). Accord won't read correctly beacuse it 
won't honor the CL and miss non-transactional writes that haven't been repaired 
and non-transactional
-    // reads will miss all the writes being routed through Accord since they 
occur asynchronously. Something has to give here where either writes routed 
through are Accord are synchronous at CL
-    // or reads are routed through Accord and read at quorum as long as the 
range has not completed the first phase (repair).
-    public ConsistencyLevel readCLForStrategy(ConsistencyLevel 
consistencyLevel)
+    private boolean ignoresSuppliedReadCL()
     {
-        if (ignoresSuppliedConsistencyLevel)
-            return null;
+        return writesThroughAccord && blockingReadRepairThroughAccord;
+    }
+
+    public ConsistencyLevel readCLForStrategy(TransactionalMigrationFromMode 
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId 
tableId, Token token)
+    {
+        if (ignoresSuppliedReadCL())
+        {
+            TableMigrationState tms = 
cm.consensusMigrationState.tableStates.get(tableId);
+            checkState(tms != null || fromMode == 
TransactionalMigrationFromMode.none);

Review Comment:
   is this true?  If you enable migration and this is the first time its 
touched, does a `TableMigrationState` exist?



##########
src/java/org/apache/cassandra/service/consensus/TransactionalMode.java:
##########
@@ -112,14 +121,25 @@ public ConsistencyLevel 
commitCLForStrategy(ConsistencyLevel consistencyLevel)
         return consistencyLevel;
     }
 
-    // TODO (required): This won't work for migration directly from none to 
full because there is no safe system to read from
-    // during the first phase (repair). Accord won't read correctly beacuse it 
won't honor the CL and miss non-transactional writes that haven't been repaired 
and non-transactional
-    // reads will miss all the writes being routed through Accord since they 
occur asynchronously. Something has to give here where either writes routed 
through are Accord are synchronous at CL
-    // or reads are routed through Accord and read at quorum as long as the 
range has not completed the first phase (repair).
-    public ConsistencyLevel readCLForStrategy(ConsistencyLevel 
consistencyLevel)
+    private boolean ignoresSuppliedReadCL()
     {
-        if (ignoresSuppliedConsistencyLevel)
-            return null;
+        return writesThroughAccord && blockingReadRepairThroughAccord;
+    }
+
+    public ConsistencyLevel readCLForStrategy(TransactionalMigrationFromMode 
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId 
tableId, Token token)
+    {
+        if (ignoresSuppliedReadCL())
+        {
+            TableMigrationState tms = 
cm.consensusMigrationState.tableStates.get(tableId);
+            checkState(tms != null || fromMode == 
TransactionalMigrationFromMode.none);

Review Comment:
   ```suggestion
               checkState(tms != null || fromMode == 
TransactionalMigrationFromMode.none, "%s == null or %s != none", tms, fromMode);
   ```
   
   if this happens, good to know why



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -356,6 +361,7 @@ public static RowIterator cas(String keyspaceName,
         ConsensusAttemptResult lastAttemptResult;
         do
         {
+            ClusterMetadata cm = ClusterMetadata.current();

Review Comment:
   should we fetch the `TableMetadata` again from this CM?



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2267,7 +2324,28 @@ public static PartitionIterator 
readRegular(SinglePartitionReadCommand.Group gro
         long start = nanoTime();
         try
         {
-            PartitionIterator result = fetchRows(group.queries, 
consistencyLevel, coordinator, requestTime);
+            ClusterMetadata cm = ClusterMetadata.current();
+            TableId tableId = group.queries.get(0).metadata().id;
+            // Returns null for local tables
+            TableMetadata tableMetadata = getTableMetadata(cm, tableId);
+            if (tableMetadata == null)
+                tableMetadata = 
Schema.instance.localKeyspaces().getTableOrViewNullable(tableId);
+            TableParams tableParams = tableMetadata.params;
+
+            TransactionalMode transactionalMode = 
tableParams.transactionalMode;
+            TransactionalMigrationFromMode transactionalMigrationFromMode = 
tableParams.transactionalMigrationFrom;
+            if (transactionalMigrationFromMode != 
TransactionalMigrationFromMode.none && transactionalMode.readsThroughAccord && 
transactionalMigrationFromMode.writesThroughAccord() && 
transactionalMigrationFromMode.readsThroughAccord())

Review Comment:
   what case is this?  if the read/writes *from* are accord... then are we 
migrating from `full` to `full`?



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2141,28 +2149,77 @@ private static PartitionIterator 
readWithConsensus(SinglePartitionReadCommand.Gr
         return lastResult.serialReadResult;
     }
 
-    private static ConsensusAttemptResult 
readWithAccord(SinglePartitionReadCommand.Group group, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
+    private static ConsistencyLevel 
consistencyLevelForAccordRead(ClusterMetadata cm, 
SinglePartitionReadCommand.Group group, @Nullable ConsistencyLevel 
consistencyLevel)
+    {
+        // Null means no specific consistency behavior is required from 
Accord, it's functionally similar to
+        // reading at ONE if you are reading data that wasn't written via 
Accord
+        if (consistencyLevel == null)
+            return null;
+
+        TableId tableId = group.queries.get(0).metadata().id;
+        TableParams tableParams = getTableMetadata(cm, tableId).params;
+        TransactionalMode mode = tableParams.transactionalMode;
+        TransactionalMigrationFromMode migrationFromMode = 
tableParams.transactionalMigrationFrom;
+        for (SinglePartitionReadCommand command : group.queries)
+        {
+            // readCLForStrategy should return either null or the supplied 
consistency level
+            // in which case we will read everything at that CL since Accord 
doesn't support per table
+            // read consistency
+            ConsistencyLevel commitCL = 
mode.readCLForStrategy(migrationFromMode, consistencyLevel, cm, tableId, 
command.partitionKey().getToken());
+            if (commitCL != null)
+                return commitCL;
+        }
+        return null;
+    }
+
+    private static ConsensusAttemptResult readWithAccord(ClusterMetadata cm, 
SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
     {
-        if (group.queries.size() > 1)
-            throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency 
may only be requested for one partition at a time");
-        SinglePartitionReadCommand readCommand = group.queries.get(0);
         // If the non-SERIAL write strategy is sending all writes through 
Accord there is no need to use the supplied consistency
         // level since Accord will manage reading safely
         TransactionalMode transactionalMode = 
group.metadata().params.transactionalMode;
-        consistencyLevel = 
transactionalMode.readCLForStrategy(consistencyLevel);
-        TxnRead read = TxnRead.createSerialRead(readCommand, consistencyLevel);
-        Invariants.checkState(read.keys().size() == 1, "Ephemeral reads are 
only strict-serializable for single partition reads");
-        Txn txn = new Txn.InMemory(transactionalMode == TransactionalMode.full 
&& DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() ? EphemeralRead : 
Read, read.keys(), read, TxnQuery.ALL, null);
-        IAccordService accordService = AccordService.instance();
-        TxnResult txnResult = accordService.coordinate(txn, consistencyLevel, 
requestTime);
+        consistencyLevel = consistencyLevelForAccordRead(cm, group, 
consistencyLevel);
+        TxnRead read = TxnRead.createSerialRead(group.queries, 
consistencyLevel);

Review Comment:
   if `consistencyLevel == null` can't we do a normal Accord read and not a 
serial read?



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2141,28 +2149,77 @@ private static PartitionIterator 
readWithConsensus(SinglePartitionReadCommand.Gr
         return lastResult.serialReadResult;
     }
 
-    private static ConsensusAttemptResult 
readWithAccord(SinglePartitionReadCommand.Group group, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
+    private static ConsistencyLevel 
consistencyLevelForAccordRead(ClusterMetadata cm, 
SinglePartitionReadCommand.Group group, @Nullable ConsistencyLevel 
consistencyLevel)
+    {
+        // Null means no specific consistency behavior is required from 
Accord, it's functionally similar to
+        // reading at ONE if you are reading data that wasn't written via 
Accord
+        if (consistencyLevel == null)
+            return null;
+
+        TableId tableId = group.queries.get(0).metadata().id;
+        TableParams tableParams = getTableMetadata(cm, tableId).params;
+        TransactionalMode mode = tableParams.transactionalMode;
+        TransactionalMigrationFromMode migrationFromMode = 
tableParams.transactionalMigrationFrom;
+        for (SinglePartitionReadCommand command : group.queries)
+        {
+            // readCLForStrategy should return either null or the supplied 
consistency level
+            // in which case we will read everything at that CL since Accord 
doesn't support per table
+            // read consistency
+            ConsistencyLevel commitCL = 
mode.readCLForStrategy(migrationFromMode, consistencyLevel, cm, tableId, 
command.partitionKey().getToken());
+            if (commitCL != null)
+                return commitCL;
+        }
+        return null;
+    }
+
+    private static ConsensusAttemptResult readWithAccord(ClusterMetadata cm, 
SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
     {
-        if (group.queries.size() > 1)
-            throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency 
may only be requested for one partition at a time");
-        SinglePartitionReadCommand readCommand = group.queries.get(0);
         // If the non-SERIAL write strategy is sending all writes through 
Accord there is no need to use the supplied consistency
         // level since Accord will manage reading safely
         TransactionalMode transactionalMode = 
group.metadata().params.transactionalMode;
-        consistencyLevel = 
transactionalMode.readCLForStrategy(consistencyLevel);
-        TxnRead read = TxnRead.createSerialRead(readCommand, consistencyLevel);
-        Invariants.checkState(read.keys().size() == 1, "Ephemeral reads are 
only strict-serializable for single partition reads");
-        Txn txn = new Txn.InMemory(transactionalMode == TransactionalMode.full 
&& DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() ? EphemeralRead : 
Read, read.keys(), read, TxnQuery.ALL, null);
-        IAccordService accordService = AccordService.instance();
-        TxnResult txnResult = accordService.coordinate(txn, consistencyLevel, 
requestTime);
+        consistencyLevel = consistencyLevelForAccordRead(cm, group, 
consistencyLevel);
+        TxnRead read = TxnRead.createSerialRead(group.queries, 
consistencyLevel);
+        Txn.Kind kind = Read;
+        if (transactionalMode == TransactionalMode.full && 
DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() && 
group.queries.size() == 1)
+            kind = EphemeralRead;
+        Txn txn = new Txn.InMemory(kind, read.keys(), read, TxnQuery.ALL, 
null);
+        AsyncTxnResult asyncTxnResult = 
AccordService.instance().coordinateAsync(txn, consistencyLevel, requestTime);
+        return getConsensusAttemptResultFromAsyncTxnResult(asyncTxnResult, 
group.queries.size(), index -> group.queries.get(index).isReversed(), 
consistencyLevel, requestTime);
+    }
+
+    /*
+     * Used for both the SERIAL and non-SERIAL read path into Accord
+     */
+    public static ConsensusAttemptResult 
getConsensusAttemptResultFromAsyncTxnResult(AsyncTxnResult asyncTxnResult, int 
numQueries, IntPredicate isQueryReversed, ConsistencyLevel cl, 
Dispatcher.RequestTime requestTime)
+    {
+        TxnResult txnResult = 
AccordService.instance().getTxnResult(asyncTxnResult, false, cl, requestTime);
+        // TODO (required): Converge on a single approach to 
RETRY_NEW_PROTOCOL, this works for now because reads don't support it anyways
         if (txnResult.kind() == retry_new_protocol)
             return RETRY_NEW_PROTOCOL;
-        TxnData data = (TxnData)txnResult;
-        FilteredPartition partition = data.get(TxnRead.SERIAL_READ);
-        if (partition != null)
-            return 
serialReadResult(PartitionIterators.singletonIterator(partition.rowIterator(readCommand.isReversed())));
-        else
+        TxnData data = (TxnData) txnResult;
+
+        if (data.isEmpty())
+        {
             return serialReadResult(EmptyIterators.partition());
+        }
+        else if (data.size() == 1)
+        {
+            FilteredPartition value = data.values().iterator().next();
+            return 
serialReadResult(PartitionIterators.singletonIterator(value.rowIterator(isQueryReversed.test(0))));
+        }
+        else
+        {
+            // TODO (review): 95% sure this isn't actually needed and the 
consumer is going consume these by DecoratedKey not iteration order, but the 
non-transactional path does preserve the order of the iterators

Review Comment:
   ```
   SELECT * FROM ... WHERE pk IN (?, ?);
   ```
   
   I would expect the rows to be in token order, so do think this is needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to