ifesdjeen commented on code in PR #205:
URL: https://github.com/apache/cassandra-accord/pull/205#discussion_r2163325664


##########
accord-core/src/main/java/accord/coordinate/ExecuteFlag.java:
##########
@@ -38,10 +43,107 @@ public static final class ExecuteFlags extends 
TinyEnumSet<ExecuteFlag>
         public static ExecuteFlags get(ExecuteFlag a) { return 
LOOKUP[encode(a)]; }
         public static ExecuteFlags get(ExecuteFlag a, ExecuteFlag b) { return 
LOOKUP[encode(a) | encode(b)]; }
         public ExecuteFlags with(ExecuteFlag a) { return LOOKUP[bitset | 
encode(a)]; }
+        public ExecuteFlags without(ExecuteFlag a) { return LOOKUP[bitset & 
~encode(a)]; }
         public ExecuteFlags or(ExecuteFlags that) { return LOOKUP[this.bitset 
| that.bitset]; }
         public ExecuteFlags and(ExecuteFlags that) { return LOOKUP[this.bitset 
& that.bitset]; }
         public boolean isEmpty() { return bitset == 0; }
         public int bits() { return bitset; }
         private ExecuteFlags(int bits) { super(bits); }
+
+        @Override
+        public String toString()
+        {
+            return toString(UNIVERSE);
+        }
+    }
+
+    public interface CoordinationFlags
+    {
+        boolean isReadyToExecute(Node.Id node);
+        boolean hasUniqueHlc();
+        void add(Node.Id node, ExecuteFlags flags);
+
+        default ExecuteFlags get(Node.Id node)
+        {
+            ExecuteFlags result = ExecuteFlags.none();
+            if (hasUniqueHlc()) result = result.with(HAS_UNIQUE_HLC);
+            if (isReadyToExecute(node)) result = result.with(READY_TO_EXECUTE);
+            return result;
+        }
+
+        static CoordinationFlags none()
+        {
+            return ALWAYS_EMPTY;
+        }
+
+        static CoordinationFlags empty(SortedList<Node.Id> list)
+        {
+            return list.size() <= 64 ? new SmallCoordinationFlags(list) : new 
LargeCoordinationFlags(list);
+        }
+    }
+
+    private static final SmallCoordinationFlags ALWAYS_EMPTY = new 
SmallCoordinationFlags(new SortedArrays.SortedArrayList<>(new Node.Id[0]));
+    static
+    {
+        ALWAYS_EMPTY.hasUniqueHlc = false;
+    }
+
+    static class SmallCoordinationFlags extends 
SortedListSet.SmallSortedListSet<Node.Id> implements CoordinationFlags
+    {
+        boolean hasUniqueHlc = true;
+        private SmallCoordinationFlags(SortedList<Node.Id> list)
+        {
+            super(list);
+        }
+
+        @Override
+        public boolean isReadyToExecute(Node.Id node)
+        {
+            return contains(node);
+        }
+
+        @Override
+        public boolean hasUniqueHlc()
+        {
+            return hasUniqueHlc;
+        }
+
+        @Override
+        public void add(Node.Id node, ExecuteFlags flags)
+        {
+            if (hasUniqueHlc && !flags.contains(HAS_UNIQUE_HLC))

Review Comment:
   nit: 
   
   ```
               hasUniqueHlc &= flags.contains(HAS_UNIQUE_HLC);
   ```



##########
accord-core/src/main/java/accord/coordinate/ExecuteTxn.java:
##########
@@ -262,28 +247,21 @@ public String toString()
                '}';
     }
 
-    @Override
-    public void timeout()
-    {
-        onSlowResponse(node.id());
-        localTimeout = null;
-    }
-
-    @Override
-    public int stripe()
+    boolean mayFastExecute(ExecuteFlags flags)
     {
-        return txnId.hashCode();
+        return flags.contains(READY_TO_EXECUTE) && 
(!txnId.hasPrivilegedCoordinator() || path != FAST) && 
fastReadsMayBypassSafeStore(txnId);

Review Comment:
   I am very likely misunderstanding something but 
`(!txnId.hasPrivilegedCoordinator() || path != FAST`: if has no privileged 
coordinator, or path isn't fast, allow fast execute?



##########
accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java:
##########
@@ -111,11 +113,18 @@ void onPreAccepted(Topologies topologies, Timestamp 
executeAt, SortedListMap<Nod
             Deps deps = mergeFastOrMediumDeps(oks);
             if (deps != null)
             {
-                ExecuteFlags executeFlags = 
Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v : 
v.and(ok.flags), ExecuteFlags.all());
+                CoordinationFlags executeFlags = oks.foldlNonNull((d, k, v, 
out) -> {

Review Comment:
   Took me a bit to understand this; maybe this warrants a comment. Also, 
should we deduplicate this with `CoordinateEphemeralRead`, in case this logic 
gets changed in future?



##########
accord-core/src/main/java/accord/local/Commands.java:
##########
@@ -519,19 +527,55 @@ else if (command.hasBeen(PreCommitted) && 
!executeAt.equals(command.executeAt())
         PartialDeps partialDeps = prepareDeps(validated, participants, 
command, deps);
         participants = prepareParticipants(validated, participants, command);
 
-        WaitingOn waitingOn = !command.hasBeen(Stable) ? 
initialiseWaitingOn(safeStore, txnId,  executeAt, participants, partialDeps)
-                                                       : 
command.asCommitted().waitingOn();
+        WaitingOn waitingOn = newSaveStatus != SaveStatus.PreApplied
+                              ? WaitingOn.none(txnId.domain(), partialDeps)
+                              : command.hasBeen(Stable)
+                                ? nonNull(command.asCommitted().waitingOn())
+                                : initialiseWaitingOn(safeStore, txnId,  
executeAt, participants, partialDeps);
+
+        Invariants.require(newSaveStatus == SaveStatus.PreApplied || 
validateSafeToFastApply(safeStore, safeCommand));
 
         Ballot promised = command.promised();
         if (promised.compareTo(ballot) <= 0)
             promised = ballot;
-        safeCommand.preapplied(safeStore, participants, promised, executeAt, 
partialTxn, partialDeps, waitingOn, writes, result);
-        if (logger.isTraceEnabled())
-            logger.trace("{}: apply, status set to Executed with executeAt: 
{}, deps: {}", txnId, executeAt, partialDeps);
 
-        // must signal preapplied first, else we may be applied (and have 
cleared progress log state) already before maybeExecute exits
-        maybeExecute(safeStore, safeCommand, true, true);
-        safeStore.agent().eventListener().onExecuted(command);
+        if (newSaveStatus == SaveStatus.PreApplied && !waitingOn.isWaiting())
+            newSaveStatus = Applying;
+        if (newSaveStatus == Applying && (!txnId.is(Write) || writes == null 
|| !writes.keys.intersects(participants.stillExecutes())))
+            newSaveStatus = SaveStatus.Applied;
+
+        switch (newSaveStatus)
+        {
+            default: throw UnhandledEnum.invalid(newSaveStatus);
+            case PreApplied:
+            {
+                Command.Executed executed = safeCommand.preapplied(safeStore, 
participants, ballot, executeAt, partialTxn, partialDeps, waitingOn, writes, 
result);
+                logger.trace("{}: preapplied", executed.txnId());
+                // must signal preapplied first, else we may be applied (and 
have cleared progress log state) already before maybeExecute exits
+                safeStore.agent().eventListener().onPreApplied(executed);
+                maybeExecute(safeStore, safeCommand, true, true);
+                break;
+            }
+            case Applying:
+            {
+                Invariants.require(!waitingOn.isWaiting());
+                Command.Executed executed = safeCommand.applying(safeStore, 
participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result);
+                safeStore.agent().eventListener().onPreApplied(executed);
+                safeStore.notifyListeners(safeCommand, command);
+                logger.trace("{}: applying", executed.txnId());
+                applyChain(safeStore, executed).begin(safeStore.agent());
+                break;
+            }
+            case Applied:
+            {
+                Command.Executed executed = safeCommand.applied(safeStore, 
participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result);
+                safeStore.agent().eventListener().onPreApplied(executed);
+                safeStore.agent().eventListener().onApplied(executed, -1);
+                safeStore.notifyListeners(safeCommand, command);

Review Comment:
   nit: I would still add `break` here just for clarity / in case we move 
things around



##########
accord-core/src/main/java/accord/local/Commands.java:
##########
@@ -963,6 +1017,12 @@ private static Command purge(Command command, @Nonnull 
StoreParticipants newPart
         return result;
     }
 
+    private static boolean validateSafeToFastApply(SafeCommandStore safeStore, 
SafeCommand safeCommand)
+    {
+        // TODO (now): implement validation!!
+        return true;

Review Comment:
   Just to make sure: did now mean in this patch or we're ok to leave it out 
for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to