dcapwell commented on code in PR #3568:
URL: https://github.com/apache/cassandra/pull/3568#discussion_r1779889761


##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2141,28 +2149,77 @@ private static PartitionIterator 
readWithConsensus(SinglePartitionReadCommand.Gr
         return lastResult.serialReadResult;
     }
 
-    private static ConsensusAttemptResult 
readWithAccord(SinglePartitionReadCommand.Group group, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
+    private static ConsistencyLevel 
consistencyLevelForAccordRead(ClusterMetadata cm, 
SinglePartitionReadCommand.Group group, @Nullable ConsistencyLevel 
consistencyLevel)
+    {
+        // Null means no specific consistency behavior is required from 
Accord, it's functionally similar to
+        // reading at ONE if you are reading data that wasn't written via 
Accord
+        if (consistencyLevel == null)
+            return null;
+
+        TableId tableId = group.queries.get(0).metadata().id;
+        TableParams tableParams = getTableMetadata(cm, tableId).params;
+        TransactionalMode mode = tableParams.transactionalMode;
+        TransactionalMigrationFromMode migrationFromMode = 
tableParams.transactionalMigrationFrom;
+        for (SinglePartitionReadCommand command : group.queries)
+        {
+            // readCLForStrategy should return either null or the supplied 
consistency level
+            // in which case we will read everything at that CL since Accord 
doesn't support per table
+            // read consistency
+            ConsistencyLevel commitCL = 
mode.readCLForStrategy(migrationFromMode, consistencyLevel, cm, tableId, 
command.partitionKey().getToken());
+            if (commitCL != null)
+                return commitCL;
+        }
+        return null;
+    }
+
+    private static ConsensusAttemptResult readWithAccord(ClusterMetadata cm, 
SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
     {
-        if (group.queries.size() > 1)
-            throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency 
may only be requested for one partition at a time");
-        SinglePartitionReadCommand readCommand = group.queries.get(0);
         // If the non-SERIAL write strategy is sending all writes through 
Accord there is no need to use the supplied consistency
         // level since Accord will manage reading safely
         TransactionalMode transactionalMode = 
group.metadata().params.transactionalMode;
-        consistencyLevel = 
transactionalMode.readCLForStrategy(consistencyLevel);
-        TxnRead read = TxnRead.createSerialRead(readCommand, consistencyLevel);
-        Invariants.checkState(read.keys().size() == 1, "Ephemeral reads are 
only strict-serializable for single partition reads");
-        Txn txn = new Txn.InMemory(transactionalMode == TransactionalMode.full 
&& DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() ? EphemeralRead : 
Read, read.keys(), read, TxnQuery.ALL, null);
-        IAccordService accordService = AccordService.instance();
-        TxnResult txnResult = accordService.coordinate(txn, consistencyLevel, 
requestTime);
+        consistencyLevel = consistencyLevelForAccordRead(cm, group, 
consistencyLevel);
+        TxnRead read = TxnRead.createSerialRead(group.queries, 
consistencyLevel);
+        Txn.Kind kind = Read;
+        if (transactionalMode == TransactionalMode.full && 
DatabaseDescriptor.getAccordEphemeralReadEnabledEnabled() && 
group.queries.size() == 1)
+            kind = EphemeralRead;
+        Txn txn = new Txn.InMemory(kind, read.keys(), read, TxnQuery.ALL, 
null);
+        AsyncTxnResult asyncTxnResult = 
AccordService.instance().coordinateAsync(txn, consistencyLevel, requestTime);
+        return getConsensusAttemptResultFromAsyncTxnResult(asyncTxnResult, 
group.queries.size(), index -> group.queries.get(index).isReversed(), 
consistencyLevel, requestTime);
+    }
+
+    /*
+     * Used for both the SERIAL and non-SERIAL read path into Accord
+     */
+    public static ConsensusAttemptResult 
getConsensusAttemptResultFromAsyncTxnResult(AsyncTxnResult asyncTxnResult, int 
numQueries, IntPredicate isQueryReversed, ConsistencyLevel cl, 
Dispatcher.RequestTime requestTime)
+    {
+        TxnResult txnResult = 
AccordService.instance().getTxnResult(asyncTxnResult, false, cl, requestTime);
+        // TODO (required): Converge on a single approach to 
RETRY_NEW_PROTOCOL, this works for now because reads don't support it anyways
         if (txnResult.kind() == retry_new_protocol)
             return RETRY_NEW_PROTOCOL;
-        TxnData data = (TxnData)txnResult;
-        FilteredPartition partition = data.get(TxnRead.SERIAL_READ);
-        if (partition != null)
-            return 
serialReadResult(PartitionIterators.singletonIterator(partition.rowIterator(readCommand.isReversed())));
-        else
+        TxnData data = (TxnData) txnResult;
+
+        if (data.isEmpty())
+        {
             return serialReadResult(EmptyIterators.partition());
+        }
+        else if (data.size() == 1)
+        {
+            FilteredPartition value = data.values().iterator().next();
+            return 
serialReadResult(PartitionIterators.singletonIterator(value.rowIterator(isQueryReversed.test(0))));
+        }
+        else
+        {
+            // TODO (review): 95% sure this isn't actually needed and the 
consumer is going consume these by DecoratedKey not iteration order, but the 
non-transactional path does preserve the order of the iterators

Review Comment:
   not a blocker for this patch, but we should find testing that hits this case 
and make sure we maintain the existing behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to