dcapwell commented on code in PR #3694:
URL: https://github.com/apache/cassandra/pull/3694#discussion_r1847295190


##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2181,18 +2183,18 @@ private static ConsistencyLevel 
consistencyLevelForAccordRead(ClusterMetadata cm
         return null;
     }
 
-
-    public static AsyncTxnResult readWithAccord(ClusterMetadata cm, 
PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
-    {
+    public static AsyncTxnResult readWithAccord(ClusterMetadata cm, 
PartitionRangeReadCommand command, List<AbstractBounds<PartitionPosition>> 
ranges, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime)  
  {
         if (consistencyLevel != null && 
!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
             throw new InvalidRequestException(consistencyLevel + " is not 
supported by Accord");
 
         TableMetadata tableMetadata = getTableMetadata(cm, 
command.metadata().id);
         TableParams tableParams = tableMetadata.params;
         Range<Token> readRange = new 
Range<>(command.dataRange().startKey().getToken(), 
command.dataRange().stopKey().getToken());
         consistencyLevel = 
tableParams.transactionalMode.readCLForStrategy(tableParams.transactionalMigrationFrom,
 consistencyLevel, cm, tableMetadata.id, readRange);
-        TxnRead read = new TxnRangeRead(command, consistencyLevel);
-        Txn.Kind kind = EphemeralRead;
+        TxnRead read = new TxnRangeRead(command, ranges, consistencyLevel);
+        Txn.Kind kind = Read;
+        if (tableParams.transactionalMode == TransactionalMode.full && 
getAccordEphemeralReadEnabledEnabled() && 
tableParams.transactionalMigrationFrom == none)

Review Comment:
   `tableParams.transactionalMode == TransactionalMode.full && 
getAccordEphemeralReadEnabledEnabled() && 
tableParams.transactionalMigrationFrom == none`
   
   this really isn't the only place that deals with this, so wonder if its best 
to make a reusable method?  `allowEphemeralReads(tableMetadata)`?



##########
src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java:
##########
@@ -79,20 +80,31 @@ public final int compareTo(AccordRoutableKey that)
         if (cmp != 0)
             return cmp;
 
-        if (this.getClass() == SentinelKey.class || that.getClass() == 
SentinelKey.class)
+        Class<?> thisClass = this.getClass();
+        Class<?> thatClass = that.getClass();
+        if (thisClass== SentinelKey.class || thatClass == SentinelKey.class)

Review Comment:
   ```suggestion
           if (thisClass == SentinelKey.class || thatClass == SentinelKey.class)
   ```



##########
src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java:
##########
@@ -318,6 +319,119 @@ public long serializedSize(TokenKey key, int version)
         }
     }
 
+    // Allows the creation of a Range that is begin inclusive or end exclusive 
for a given Token
+    public static final class MinTokenKey extends TokenKey
+    {
+        private static final long EMPTY_SIZE;
+
+        @Override
+        public Range asRange()
+        {
+            AccordRoutingKey before = token.isMinimum()
+                                      ? new SentinelKey(table, true)
+                                      : new MinTokenKey(table, 
token.decreaseSlightly());
+
+            return new TokenRange(before, this);
+        }
+
+        static
+        {
+            EMPTY_SIZE = ObjectSizes.measure(new MinTokenKey(null, null));
+        }

Review Comment:
   shouldn't this be higher? before `asRange`?



##########
src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java:
##########
@@ -318,6 +319,119 @@ public long serializedSize(TokenKey key, int version)
         }
     }
 
+    // Allows the creation of a Range that is begin inclusive or end exclusive 
for a given Token
+    public static final class MinTokenKey extends TokenKey
+    {
+        private static final long EMPTY_SIZE;
+
+        @Override
+        public Range asRange()
+        {
+            AccordRoutingKey before = token.isMinimum()
+                                      ? new SentinelKey(table, true)
+                                      : new MinTokenKey(table, 
token.decreaseSlightly());

Review Comment:
   shouldn't this be a `TokenKey`?  as start is exclusive?



##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -290,6 +291,66 @@ private AsyncChain<Data> readChains()
                 }));
             });
         });
+        return results;
+    }
+
+    private List<AsyncChain<Data>> rangeReadChains(long nowInSeconds, 
Dispatcher.RequestTime requestTime)
+    {
+        TxnRangeRead read = (TxnRangeRead) txn.read();
+        Seekables<?, ?> keys = txn.read().keys();

Review Comment:
   nit: `ranges`?



##########
src/java/org/apache/cassandra/service/consensus/TransactionalMode.java:
##########
@@ -112,7 +112,10 @@ public enum TransactionalMode
      * Execute writes through Accord skipping StorageProxy's normal write 
path. Ignores the provided consistency level
      * which makes Accord commit writes at ANY similar to Paxos with commit 
consistency level ANY.
      */
-    full(true, true, true, true, true);
+    full(true, true, true, true, true),
+
+    // For tests, Accord will read and be forced to do interop reads
+    test_interop_read(true, false, false, true, true);

Review Comment:
   more states worry me... and even at community over code the feedback was to 
limit this enum...  



##########
test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java:
##########
@@ -337,7 +337,7 @@ public long[] deflateRegularColumns(Object[] regulars)
 
     public boolean isWriteTimeFromAccord()
     {
-        return transactionalMode.isPresent() && 
transactionalMode.get().nonSerialWritesThroughAccord;
+        return writeTimeFromAccord;

Review Comment:
   I was so confused by this, then looked at the git history and looks like 
race condition of merge and not running CI with the most recent rebase...  



##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -290,6 +291,66 @@ private AsyncChain<Data> readChains()
                 }));
             });
         });
+        return results;
+    }
+
+    private List<AsyncChain<Data>> rangeReadChains(long nowInSeconds, 
Dispatcher.RequestTime requestTime)
+    {
+        TxnRangeRead read = (TxnRangeRead) txn.read();
+        Seekables<?, ?> keys = txn.read().keys();
+        List<AsyncChain<Data>> results = new ArrayList<>();
+        keys.forEach(key -> {
+            TokenRange range = (TokenRange)key;
+            PartitionRangeReadCommand command = read.commandForSubrange(range, 
nowInSeconds);
+
+            // This should only rarely occur when coordinators start a 
transaction in a migrating range
+            // because they haven't yet updated their cluster metadata.
+            // It would be harmless to do the read, because it will be 
rejected in `TxnQuery` anyways,
+            // but it's faster to skip the read
+            // TODO (required): To make migration work we need to validate 
that the range is all on Accord
+            // if any part isn't we should reject the read
+//                TableMigrationState tms = 
ConsensusTableMigration.getTableMigrationState(command.metadata().id);
+//                AccordClientRequestMetrics metrics = txn.kind().isWrite() ? 
accordWriteMetrics : accordReadMetrics;
+//                if 
(ConsensusRequestRouter.instance.isKeyInMigratingOrMigratedRangeFromAccord(command.metadata(),
 tms, command.partitionKey()))
+//                {
+//                    metrics.migrationSkippedReads.mark();
+//                    
results.add(AsyncChains.success(TxnData.emptyPartition(fragment.txnDataName(), 
command)));
+//                    return;
+//                }
+
+            
results.add(AsyncChains.ofCallable(Stage.ACCORD_MIGRATION.executor(), () -> {
+                TxnData result = new TxnData();
+                try (PartitionIterator iterator = 
StorageProxy.getRangeSlice(command, consistencyLevel, this, requestTime))

Review Comment:
   is this supposed to avoid going through accord?  I thought the integration 
is in `StorageProxy` in several cases, so feels unclear while reading this?



##########
src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java:
##########
@@ -290,6 +291,66 @@ private AsyncChain<Data> readChains()
                 }));
             });
         });
+        return results;
+    }
+
+    private List<AsyncChain<Data>> rangeReadChains(long nowInSeconds, 
Dispatcher.RequestTime requestTime)
+    {
+        TxnRangeRead read = (TxnRangeRead) txn.read();
+        Seekables<?, ?> keys = txn.read().keys();
+        List<AsyncChain<Data>> results = new ArrayList<>();
+        keys.forEach(key -> {
+            TokenRange range = (TokenRange)key;
+            PartitionRangeReadCommand command = read.commandForSubrange(range, 
nowInSeconds);
+
+            // This should only rarely occur when coordinators start a 
transaction in a migrating range
+            // because they haven't yet updated their cluster metadata.
+            // It would be harmless to do the read, because it will be 
rejected in `TxnQuery` anyways,
+            // but it's faster to skip the read
+            // TODO (required): To make migration work we need to validate 
that the range is all on Accord
+            // if any part isn't we should reject the read

Review Comment:
   so if part of the range isn't on accord, then we don't start the migration 
and sequence the read after the migration?  Rejecting the read would cause the 
user to see an error, so we would break SLO?



##########
src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java:
##########
@@ -318,6 +319,119 @@ public long serializedSize(TokenKey key, int version)
         }
     }
 
+    // Allows the creation of a Range that is begin inclusive or end exclusive 
for a given Token
+    public static final class MinTokenKey extends TokenKey
+    {
+        private static final long EMPTY_SIZE;
+
+        @Override
+        public Range asRange()
+        {
+            AccordRoutingKey before = token.isMinimum()
+                                      ? new SentinelKey(table, true)
+                                      : new MinTokenKey(table, 
token.decreaseSlightly());
+
+            return new TokenRange(before, this);
+        }
+
+        static
+        {
+            EMPTY_SIZE = ObjectSizes.measure(new MinTokenKey(null, null));
+        }
+
+        public MinTokenKey(TableId tableId, Token token)
+        {
+            super(tableId, token);
+        }
+
+        public MinTokenKey withToken(Token token)
+        {
+            return new MinTokenKey(table, token);
+        }
+
+        @Override
+        public Token token()
+        {
+            return token;
+        }
+
+        @Override
+        public RoutingKeyKind kindOfRoutingKey()
+        {
+            return RoutingKeyKind.MIN_TOKEN;
+        }
+
+        @Override
+        public String suffix()
+        {
+            return token.toString();
+        }
+
+        public long estimatedSizeOnHeap()
+        {
+            return EMPTY_SIZE + token().getHeapSize();
+        }
+
+        public AccordRoutingKey withTable(TableId table)
+        {
+            return new MinTokenKey(table, token);
+        }
+
+        public static final MinTokenKey.Serializer serializer = new 
MinTokenKey.Serializer();
+        public static class Serializer implements 
AccordKeySerializer<MinTokenKey>

Review Comment:
   this is copy/paste of `TokenKey.Serializer`, why not just extend that to 
avoid this duplication?  
   
   ```
   public static class ExtendedTokenKey extends TokenKey
       {
           public static class Serializer extends TokenKey.Serializer 
implements AccordKeySerializer<ExtendedTokenKey>
           {
               @Override
               public void serialize(ExtendedTokenKey key, DataOutputPlus out, 
int version) throws IOException
               {
                   super.serialize(key, out, version);
               }
   
               @Override
               public ExtendedTokenKey deserialize(DataInputPlus in, int 
version) throws IOException
               {
                   return super.deserialize(in, version);
               }
   
               @Override
               public ExtendedTokenKey fromBytes(ByteBuffer bytes, IPartitioner 
partitioner)
               {
                   return super.fromBytes(bytes, partitioner);
               }
   
               @Override
               public long serializedSize(ExtendedTokenKey key, int version)
               {
                   return super.serializedSize(key, version);
               }
           }
   
           public ExtendedTokenKey(TableId tableId, Token token)
           {
               super(tableId, token);
           }
       }
   ```
   
   The other option would be to make the `new` a factory so both can define



##########
test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java:
##########
@@ -95,52 +97,60 @@ public static void before() throws Throwable
         sut = new InJvmSut(cluster);
     }
 
-    public SingleNodeSAITestBase(boolean withAccord)
+    public SingleNodeSAITestBase(TransactionalMode transactionalMode)
     {
-        this.withAccord = withAccord;
+        this.transactionalMode = transactionalMode;
     }
 
     @Test
     public void basicSaiTest()
+    {

Review Comment:
   FYI @ifesdjeen is rewriting this to be a random schema.  So there is 
conflict and feel that your patch will likely go in first... but just wanted to 
point out this pending conflict



##########
test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java:
##########
@@ -287,6 +294,15 @@ else if (i % FLUSH_SKIP == 0)
                     {
                         logger.debug("Partition index = {}, run = {}, j = {}, 
i = {}", partitionIndex, run, j, i);
 
+                        // Much more succinct output for debugging
+//                        logger.info(query.toSelectStatement(columns, 
!query.schemaSpec.isWriteTimeFromAccord()).toString());
+//                        logger.info("Expected:");
+//                        modelState.rows().values().stream().forEach(row -> 
logger.info(row.toString(schema)));
+//                        List<ResultSetRow> rows = SelectHelper.execute(sut, 
history.clock(), query);
+//                        logger.info("Found:");
+//                        rows.stream().forEach(row -> 
logger.info(row.toString(schema)));
+//                        fail();

Review Comment:
   remove?



##########
test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java:
##########
@@ -173,6 +173,11 @@ public Object[][] execute(String statement, 
ConsistencyLevel cl, int coordinator
         {
             try
             {
+                // Easy way to get a lot of statements being run
+//                String bound = statement;
+//                for (Object bind : bindings)
+//                    bound = bound.replaceFirst("\\?", bind.toString());
+//                logger.info(bound);

Review Comment:
   remove?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to