dcapwell commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1905801566


##########
src/java/org/apache/cassandra/utils/Collectors3.java:
##########
@@ -46,6 +48,8 @@ public class Collectors3
 
     private static final Collector.Characteristics[] SET_CHARACTERISTICS = new 
Collector.Characteristics[]{ Collector.Characteristics.UNORDERED };
 
+    private static final Collector.Characteristics[] 
SORTED_SET_CHARACTERISTICS = new Collector.Characteristics[]{};

Review Comment:
   shouldn't this include `UNORDERED`?



##########
src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java:
##########
@@ -50,8 +50,8 @@ public static TransactionalMigrationFromMode 
fromMode(TransactionalMode prev, Tr
         {
             default: throw new IllegalArgumentException();
             case off: return off;
-            case unsafe: return unsafe;
-            case unsafe_writes: return unsafe_writes;
+            case test_unsafe: return unsafe;
+            case test_unsafe_writes: return unsafe_writes;
             case mixed_reads: return mixed_reads;
             case full: return full;
             case test_interop_read: return test_interop_read;

Review Comment:
   can this be moved with the other `test_` values?



##########
src/java/org/apache/cassandra/tcm/ClusterMetadataService.java:
##########
@@ -756,7 +756,12 @@ public ClusterMetadata 
fetchLogFromPeerOrCMS(ClusterMetadata metadata, InetAddre
 
     public ClusterMetadata awaitAtLeast(Epoch epoch) throws 
InterruptedException, TimeoutException
     {
-        return log.awaitAtLeast(epoch);
+        return awaitAtLeast(epoch, -1, null);
+    }
+
+    public ClusterMetadata awaitAtLeast(Epoch epoch, long timeout, TimeUnit 
unit) throws InterruptedException, TimeoutException

Review Comment:
   ```suggestion
       public ClusterMetadata awaitAtLeast(Epoch epoch, long timeout, @Nullable 
TimeUnit unit) throws InterruptedException, TimeoutException
   ```



##########
test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java:
##########
@@ -267,6 +267,16 @@ public InstanceConfig forceSet(String fieldName, Object 
value)
         return this;
     }
 
+    public InstanceConfig extendTimeouts(int seconds)

Review Comment:
   why do you take a `int seconds` that is ignored?



##########
src/java/org/apache/cassandra/tcm/log/LocalLog.java:
##########
@@ -707,15 +710,21 @@ private Async(LogSpec logSpec)
 
         @Override
         public ClusterMetadata awaitAtLeast(Epoch epoch) throws 
InterruptedException, TimeoutException
+        {
+            return awaitAtLeast(epoch, -1, null);
+        }
+
+        @Override
+        public ClusterMetadata awaitAtLeast(Epoch epoch, long timeout, 
TimeUnit unit) throws InterruptedException, TimeoutException
         {
             ClusterMetadata lastSeen = committed.get();
             return lastSeen.epoch.compareTo(epoch) >= 0
                    ? lastSeen
-                   : new AwaitCommit(epoch).get();
+                   : new AwaitCommit(epoch).get(timeout, unit);
         }
 
         @Override
-        public void runOnce(DurationSpec duration) throws TimeoutException
+        public void runOnce(long timeout, TimeUnit unit) throws 
TimeoutException

Review Comment:
   why this change?  please sync with @beobal.
   
   with duration there are only 2 states we could be in, but with `long, 
TimeUnit` there are more
   
   ```
   timeout >= 0, unit != null
   timeout < 0, unit != null
   timeout >= 0, unit == null -- bug
   timeout < 0, unit == null
   ```
   
   I wouldn't try to change the style of this logic without first syncing with 
sam, and even then I feel duration is better than `long, TimeUnit`



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -324,39 +337,130 @@ List<TxnWrite.Fragment> createWriteFragments(ClientState 
state, QueryOptions opt
         return fragments;
     }
 
-    AccordUpdate createUpdate(ClientState state, QueryOptions options, 
Map<Integer, NamedSelect> autoReads, Consumer<Key> keyConsumer)
+    AccordUpdate createUpdate(ClusterMetadata cm, ClientState state, 
QueryOptions options, Map<Integer, NamedSelect> autoReads, Set<Key> keys)
     {
-        return new TxnUpdate(createWriteFragments(state, options, autoReads, 
keyConsumer), createCondition(options), null, false);
+        checkArgument(keys.isEmpty(), "Construct update before reads so the 
key set can be used to determine commit consistency level");
+        List<TxnWrite.Fragment> writeFragments = createWriteFragments(state, 
options, autoReads, keys);
+        ConsistencyLevel commitCL = consistencyLevelForAccordCommit(cm, keys, 
options.getConsistency());
+        return new TxnUpdate(writeFragments, createCondition(options), 
commitCL, false);
     }
 
     Keys toKeys(SortedSet<Key> keySet)
     {
         return new Keys(keySet);
     }
 
+    private static TransactionalMode transactionalModeForSingleKey(Keys keys)
+    {
+        return Schema.instance.getTableMetadata(((AccordRoutableKey) 
keys.get(0)).table()).params.transactionalMode;
+    }
+
+    private ConsistencyLevel consistencyLevelForAccordRead(ClusterMetadata cm, 
Set<Key> keys, ConsistencyLevel consistencyLevel)
+    {
+        // Write transactions are read/write so it creates a read and ends up 
needing a consistency level

Review Comment:
   is that always true?
   
   ```
   INSERT INTO tbl ...
   ```
   
   that doesn't require a read



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -768,7 +775,17 @@ public TopologyManager topology()
     public @Nonnull TxnResult coordinate(long minEpoch, @Nonnull Txn txn, 
@Nonnull ConsistencyLevel consistencyLevel, @Nonnull Dispatcher.RequestTime 
requestTime)
     {
         AsyncTxnResult asyncTxnResult = coordinateAsync(minEpoch, txn, 
consistencyLevel, requestTime);
-        return getTxnResult(asyncTxnResult, txn.isWrite(), consistencyLevel, 
requestTime);
+        try
+        {
+            return getTxnResult(asyncTxnResult);
+        }
+        catch (TopologyMismatch e)
+        {
+            // For now assuming topology mismatch is caused by a race 
misrouting

Review Comment:
   there are 2 clear cases this happens
   
   1) make schema change that enable accord, then query right away; the 
coordinator may not have seen this epoch yet
   2) remove accord or drop table, then query; this should never make forward 
progress in accord
   
   From this code's point of view its hard to tell, so this logic could be safe 
in #1 but I worry about the impact on #2... ill try o keep this in mind



##########
src/java/org/apache/cassandra/hints/HintsDispatcher.java:
##########
@@ -248,7 +252,9 @@ else if (outcome == RETRY_DIFFERENT_SYSTEM)
                 failedRetryDifferentSystem = true;
         }
 
-        if (failures > 0 || timeouts > 0 || failedRetryDifferentSystem)
+        // The batchlog Accord hints need to return abort if any hint needs to 
be retried and retry the whole page
+        // since we don't want hints to ping pong back and forth vai 
hintsNeedingRehinting

Review Comment:
   ```suggestion
           // since we don't want hints to ping pong back and forth via 
hintsNeedingRehinting
   ```



##########
src/java/org/apache/cassandra/tcm/ClusterMetadataService.java:
##########
@@ -756,7 +756,12 @@ public ClusterMetadata 
fetchLogFromPeerOrCMS(ClusterMetadata metadata, InetAddre
 
     public ClusterMetadata awaitAtLeast(Epoch epoch) throws 
InterruptedException, TimeoutException
     {
-        return log.awaitAtLeast(epoch);
+        return awaitAtLeast(epoch, -1, null);
+    }
+
+    public ClusterMetadata awaitAtLeast(Epoch epoch, long timeout, TimeUnit 
unit) throws InterruptedException, TimeoutException

Review Comment:
   said this in another comment, but sync with @beobal as TCM uses 
`DurationSpec` so this change isn't in-sync with the existing style



##########
src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java:
##########
@@ -72,7 +72,7 @@ protected PartitionRangeReadCommand(Epoch serializedAtEpoch,
                                         boolean isDigest,
                                         int digestVersion,
                                         boolean acceptsTransient,
-                                        boolean allowOutOfRangeReads,
+                                        boolean allowsPotentialTxnConflicts,

Review Comment:
   spoke with Ariel about this and it turns out that this logic was from live 
migration and the fact the name matches a feature developed for trunk should 
not be confused.  This logic isn't on trunk and is part of live migration, so 
renaming and changing behavior doesn't impact the out of range read/write logic 
from Caleb



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -324,39 +337,130 @@ List<TxnWrite.Fragment> createWriteFragments(ClientState 
state, QueryOptions opt
         return fragments;
     }
 
-    AccordUpdate createUpdate(ClientState state, QueryOptions options, 
Map<Integer, NamedSelect> autoReads, Consumer<Key> keyConsumer)
+    AccordUpdate createUpdate(ClusterMetadata cm, ClientState state, 
QueryOptions options, Map<Integer, NamedSelect> autoReads, Set<Key> keys)
     {
-        return new TxnUpdate(createWriteFragments(state, options, autoReads, 
keyConsumer), createCondition(options), null, false);
+        checkArgument(keys.isEmpty(), "Construct update before reads so the 
key set can be used to determine commit consistency level");
+        List<TxnWrite.Fragment> writeFragments = createWriteFragments(state, 
options, autoReads, keys);
+        ConsistencyLevel commitCL = consistencyLevelForAccordCommit(cm, keys, 
options.getConsistency());
+        return new TxnUpdate(writeFragments, createCondition(options), 
commitCL, false);
     }
 
     Keys toKeys(SortedSet<Key> keySet)
     {
         return new Keys(keySet);
     }
 
+    private static TransactionalMode transactionalModeForSingleKey(Keys keys)
+    {
+        return Schema.instance.getTableMetadata(((AccordRoutableKey) 
keys.get(0)).table()).params.transactionalMode;
+    }
+
+    private ConsistencyLevel consistencyLevelForAccordRead(ClusterMetadata cm, 
Set<Key> keys, ConsistencyLevel consistencyLevel)

Review Comment:
   ```suggestion
       private ConsistencyLevel consistencyLevelForAccordRead(ClusterMetadata 
cm, Set<Key> keys, @Nullable ConsistencyLevel consistencyLevel)
   ```
   
   our style says everything is `@NonNull` by default, so you should be 
explicit when `null` is supported



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -324,39 +337,130 @@ List<TxnWrite.Fragment> createWriteFragments(ClientState 
state, QueryOptions opt
         return fragments;
     }
 
-    AccordUpdate createUpdate(ClientState state, QueryOptions options, 
Map<Integer, NamedSelect> autoReads, Consumer<Key> keyConsumer)
+    AccordUpdate createUpdate(ClusterMetadata cm, ClientState state, 
QueryOptions options, Map<Integer, NamedSelect> autoReads, Set<Key> keys)
     {
-        return new TxnUpdate(createWriteFragments(state, options, autoReads, 
keyConsumer), createCondition(options), null, false);
+        checkArgument(keys.isEmpty(), "Construct update before reads so the 
key set can be used to determine commit consistency level");
+        List<TxnWrite.Fragment> writeFragments = createWriteFragments(state, 
options, autoReads, keys);
+        ConsistencyLevel commitCL = consistencyLevelForAccordCommit(cm, keys, 
options.getConsistency());
+        return new TxnUpdate(writeFragments, createCondition(options), 
commitCL, false);
     }
 
     Keys toKeys(SortedSet<Key> keySet)
     {
         return new Keys(keySet);
     }
 
+    private static TransactionalMode transactionalModeForSingleKey(Keys keys)

Review Comment:
   this looks like dead code?  its private and nothing in this file is touching?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java:
##########
@@ -392,7 +396,7 @@ public AsyncChain<Void> apply(Seekable key, 
SafeCommandStore safeStore, TxnId tx
 
         long timestamp = safeCfk.current().uniqueHlc(safeStore, txnId, 
executeAt);
         // TODO (low priority - do we need to compute nowInSeconds, or can we 
just use executeAt?)
-        int nowInSeconds = (int) 
TimeUnit.MICROSECONDS.toSeconds(executeAt.hlc());
+        long nowInSeconds = (long) 
TimeUnit.MICROSECONDS.toSeconds(executeAt.hlc());

Review Comment:
   don't think you need this cast?



##########
src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java:
##########
@@ -72,7 +72,7 @@ protected PartitionRangeReadCommand(Epoch serializedAtEpoch,
                                         boolean isDigest,
                                         int digestVersion,
                                         boolean acceptsTransient,
-                                        boolean allowOutOfRangeReads,
+                                        boolean allowsPotentialTxnConflicts,

Review Comment:
   is this a rebase issue?  if accord isn't enabled won't 
`allowsPotentialTxnConflicts` be ignored which means we still need 
`allowOutOfRangeReads`?



##########
src/java/org/apache/cassandra/service/consensus/migration/TransactionalMigrationFromMode.java:
##########
@@ -28,8 +28,8 @@ public enum TransactionalMigrationFromMode
 {
     none(null),  // No migration is in progress. The currently active 
transaction system could be either Accord or Paxos.
     off(TransactionalMode.off),
-    unsafe(TransactionalMode.unsafe),
-    unsafe_writes(TransactionalMode.unsafe_writes),
+    unsafe(TransactionalMode.test_unsafe),

Review Comment:
   shouldn't this be renamed as well?



##########
src/java/org/apache/cassandra/dht/NormalizedRanges.java:
##########
@@ -201,8 +204,7 @@ else if (!bRMin && aRange.left.compareTo(bRange.right) >= 0)
     @VisibleForTesting
     public NormalizedRanges<T> invert()
     {
-        if (isEmpty())
-            return this;
+        checkState(!isEmpty());

Review Comment:
   why fail when empty?  The only caller is 
`org.apache.cassandra.dht.NormalizedRanges#subtract` and logically if you 
subtract a empty list you should return `this` rather than fail no?



##########
src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java:
##########
@@ -320,6 +330,17 @@ public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition)
         return Transformation.apply(iterator, new Logging());
     }
 
+    public static void log(UnfilteredPartitionIterator partitions, String id, 
boolean fullDetails)

Review Comment:
   should this really be blocking and consume the iterator?  In the usage this 
causes double reads so wouldn't it be far better to just add the transformer?
   
   ```
   if (LOG_READ_RESULTS && !readCommandFinal.isDigestQuery())
                   {
                       try (UnfilteredPartitionIterator i = 
ReadCommandVerbHandler.instance.doRead(readCommandFinal, 
false).makeIterator(readCommandFinal))
                       {
                           UnfilteredPartitionIterators.log(i, 
safeStore.commandStore().toString(), true);
                       }
                   }
                   return new LocalReadData(routingKeyFinal, 
ReadCommandVerbHandler.instance.doRead(readCommandFinal, false), readCommand);
   ```



##########
src/java/org/apache/cassandra/hints/HintsDispatcher.java:
##########
@@ -534,16 +540,26 @@ public void run()
             try
             {
                 IAccordService accord = AccordService.instance();
-                TxnResult.Kind kind = accord.getTxnResult(accordTxnResult, 
true, null, requestTime).kind();
+                TxnResult.Kind kind = 
accord.getTxnResult(accordTxnResult).kind();
                 if (kind == retry_new_protocol)
                     accordOutcome = RETRY_DIFFERENT_SYSTEM;
                 else
                     accordOutcome = SUCCESS;
             }
+            catch (WriteTimeoutException | WriteFailureException | 
RetryOnDifferentSystemException | TopologyMismatch e)
+            {
+                if (e instanceof TopologyMismatch || e instanceof 
RetryOnDifferentSystemException)
+                    accordOutcome = RETRY_DIFFERENT_SYSTEM;

Review Comment:
   `TopologyMismatch` also happens when the table is dropped... I need to check 
the retry logic to see if it will revalidate that case...



##########
src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java:
##########
@@ -141,20 +141,24 @@ private IndexTermType(ColumnMetadata columnMetadata, 
List<ColumnMetadata> partit
         this.indexTargetType = indexTargetType;
         this.capabilities = calculateCapabilities(columnMetadata, 
partitionColumns, indexTargetType);
         this.indexType = calculateIndexType(columnMetadata.type, capabilities, 
indexTargetType);
-        if (indexType.subTypes().isEmpty())
+
+        AbstractType<?> baseType = indexType.unwrap();

Review Comment:
   did you pull Caleb's trunk patch into this one?  why is that?



##########
src/java/org/apache/cassandra/batchlog/BatchlogManager.java:
##########
@@ -454,12 +454,12 @@ public void finish(Set<UUID> hintedNodes)
                 if (accordResult != null)
                 {
                     IAccordService accord = AccordService.instance();
-                    TxnResult.Kind kind = accord.getTxnResult(accordResult, 
true, ConsistencyLevel.QUORUM, accordTxnStart).kind();
+                    TxnResult.Kind kind = 
accord.getTxnResult(accordResult).kind();
                     if (kind == retry_new_protocol)
                         throw new RetryOnDifferentSystemException();
                 }
             }
-            catch 
(WriteTimeoutException|WriteFailureException|RetryOnDifferentSystemException  e)
+            catch (WriteTimeoutException | WriteFailureException | 
RetryOnDifferentSystemException | TopologyMismatch e)

Review Comment:
   should we write hints if we get `TopologyMismatch`?  if we drop the table or 
remove it from accord we really shouldn't create a hint.  If we hit a race 
where we don't know about the txn yet (tcm further along than accord) then we 
*should* reject and not take the hint?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to