belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1012635404


##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -399,158 +525,253 @@ private void onChangeInternal(PartialCommand command)
 
             case Committed:
             case ReadyToExecute:
-            case Executed:
+            case PreApplied:
             case Applied:
             case Invalidated:
-                if (isUnableToApply())
-                {
-                    updatePredecessor(command);
-                    if (isWaitingOnCommit())
-                    {
-                        removeWaitingOnCommit(command);
-                    }
-                }
-                else
-                {
-                    command.removeListener(this);
-                }
-                maybeExecute(false);
+                updatePredecessor(command, false);
+                maybeExecute(safeStore, progressShard(safeStore), false, true);
                 break;
         }
     }
 
-    @Override
-    public void onChange(Command command)
-    {
-        onChangeInternal(command);
-    }
-
-    protected void postApply()
+    protected void postApply(SafeCommandStore safeStore)
     {
         logger.trace("{} applied, setting status to Applied and notifying 
listeners", txnId());
-        status(Applied);
-        notifyListeners();
+        setStatus(Applied);
+        notifyListeners(safeStore);
     }
 
-    private static Function<CommandStore, Void> callPostApply(TxnId txnId)
+    private static Function<SafeCommandStore, Void> callPostApply(TxnId txnId)
     {
-        return commandStore -> {
-            commandStore.command(txnId).postApply();
+        return safeStore -> {
+            safeStore.command(txnId).postApply(safeStore);
             return null;
         };
     }
 
-    protected Future<Void> apply()
+    protected Future<Void> apply(SafeCommandStore safeStore)
     {
         // important: we can't include a reference to *this* in the lambda, 
since the C* implementation may evict
         // the command instance from memory between now and the write 
completing (and post apply being called)
-        return writes().apply(commandStore()).flatMap(unused ->
-            commandStore().process(this, callPostApply(txnId()))
+        CommandStore unsafeStore = safeStore.commandStore();
+        return writes().apply(safeStore).flatMap(unused ->
+            unsafeStore.submit(this, callPostApply(txnId()))
         );
     }
 
-    public Future<Data> read(Keys scope)
+    public Future<Data> read(SafeCommandStore safeStore)
     {
-        return txn().read(this);
+        return partialTxn().read(safeStore, this);
     }
 
-    private Future<Void> maybeExecute(boolean notifyListenersOnNoop)
+    // TODO: maybe split into maybeExecute and maybeApply?
+    private boolean maybeExecute(SafeCommandStore safeStore, ProgressShard 
shard, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
     {
         if (logger.isTraceEnabled())
-            logger.trace("{}: Maybe executing with status {}. Will notify 
listeners on noop: {}", txnId(), status(), notifyListenersOnNoop);
+            logger.trace("{}: Maybe executing with status {}. Will notify 
listeners on noop: {}", txnId(), status(), alwaysNotifyListeners);
 
-        if (status() != Committed && status() != Executed)
+        if (status() != Committed && status() != PreApplied)
         {
-            if (notifyListenersOnNoop) notifyListeners();
-            return Writes.SUCCESS;
+            if (alwaysNotifyListeners)
+                notifyListeners(safeStore);
+            return false;
         }
 
-        if (isUnableToApply())
+        if (isUnableToExecute())
         {
-            BlockedBy blockedBy = blockedBy();
-            if (blockedBy != null)
-            {
-                logger.trace("{}: not executing, blocked on {}", txnId(), 
blockedBy.command.txnId());
-                commandStore().progressLog().waiting(blockedBy.command, 
blockedBy.someKeys);
-                if (notifyListenersOnNoop) notifyListeners();
-                return Writes.SUCCESS;
-            }
-            assert !isWaitingOnApply();
+            if (alwaysNotifyListeners)
+                notifyListeners(safeStore);
+
+            if (notifyWaitingOn)
+                new NotifyWaitingOn(this).accept(safeStore);
+            return false;
         }
 
         switch (status())
         {
             case Committed:
                 // TODO: maintain distinct ReadyToRead and ReadyToWrite states
-                status(ReadyToExecute);
+                setStatus(ReadyToExecute);
                 logger.trace("{}: set to ReadyToExecute", txnId());
-                boolean isProgressShard = progressKey() != null && 
handles(txnId().epoch, progressKey());
-                commandStore().progressLog().readyToExecute(this, 
isProgressShard, isProgressShard && progressKey().equals(homeKey()));
-                notifyListeners();
+                safeStore.progressLog().readyToExecute(this, shard);
+                notifyListeners(safeStore);
                 break;
-            case Executed:
-                logger.trace("{}: applying", txnId());
-                if (notifyListenersOnNoop) notifyListeners();
-                return apply();
+
+            case PreApplied:
+                if (executeRanges(safeStore, 
executeAt()).intersects(writes().keys, 
safeStore.commandStore()::hashIntersects))
+                {
+                    logger.trace("{}: applying", txnId());
+                    apply(safeStore);
+                }
+                else
+                {
+                    logger.trace("{}: applying no-op", txnId());
+                    setStatus(Applied);
+                    notifyListeners(safeStore);
+                }
         }
-        return Writes.SUCCESS;
+        return true;
     }
 
     /**
      * @param dependency is either committed or invalidated
+     * @param isInsert true iff this is initial {@code populateWaitingOn} call
+     * @return true iff {@code maybeExecute} might now have a different outcome
      */
-    private void updatePredecessor(PartialCommand dependency)
+    private boolean updatePredecessor(Command dependency, boolean isInsert)
     {
         Preconditions.checkState(dependency.hasBeen(Committed));
         if (dependency.hasBeen(Invalidated))
         {
             logger.trace("{}: {} is invalidated. Stop listening and removing 
from waiting on commit set.", txnId(), dependency.txnId());
             dependency.removeListener(this);
-            removeWaitingOnCommit(dependency);
+            removeWaitingOnCommit(dependency.txnId()); // TODO (now): this was 
missing in partial-replication; might be redundant?
+            return true;
         }
         else if (dependency.executeAt().compareTo(executeAt()) > 0)
         {
             // cannot be a predecessor if we execute later
-            logger.trace("{}: {} executes after us. Stop listening.", txnId(), 
dependency.txnId());
+            logger.trace("{}: {} executes after us. Stop listening and 
removing from waiting on apply set.", txnId(), dependency.txnId());
+            removeWaitingOn(dependency.txnId(), dependency.executeAt());
             dependency.removeListener(this);
+            return true;
         }
         else if (dependency.hasBeen(Applied))
         {
             logger.trace("{}: {} has been applied. Stop listening and removing 
from waiting on apply set.", txnId(), dependency.txnId());
-            removeWaitingOnApply(dependency);
+            removeWaitingOn(dependency.txnId(), dependency.executeAt());
             dependency.removeListener(this);
+            return true;
         }
-        else
+        else if (isUnableToExecute())
         {
             logger.trace("{}: adding {} to waiting on apply set.", txnId(), 
dependency.txnId());
-            addWaitingOnApplyIfAbsent(dependency);
+            addWaitingOnApplyIfAbsent(dependency.txnId(), 
dependency.executeAt());
+            removeWaitingOnCommit(dependency.txnId());
+            return false;
+        }
+        else if (isInsert)
+        {
+            logger.trace("{}: adding {} to waiting on commit set.", txnId(), 
dependency.txnId());
+            addWaitingOnCommit(dependency.txnId());
+            return false;
+        }
+        else
+        {
+            throw new IllegalStateException();
         }
     }
 
-    // TEMPORARY: once we can invalidate commands that have not been witnessed 
on any shard, we do not need to know the home shard
-    static class BlockedBy
+    void updatePredecessorAndMaybeExecute(SafeCommandStore safeStore, Command 
predecessor, boolean notifyWaitingOn)
     {
-        final PartialCommand command;
-        final Keys someKeys;
+        if (hasBeen(Applied))
+            return;
 
-        BlockedBy(PartialCommand command, Keys someKeys)
-        {
-            this.command = command;
-            this.someKeys = someKeys;
-        }
+        if (updatePredecessor(predecessor, false))
+            maybeExecute(safeStore, progressShard(safeStore), false, 
notifyWaitingOn);
     }
 
-    public BlockedBy blockedBy()
+    static class NotifyWaitingOn implements PreLoadContext, 
Consumer<SafeCommandStore>
     {
-        Command prev = this;
-        PartialCommand cur = directlyBlockedBy();
-        if (cur == null)
-            return null;
+        ExecutionPhase[] blockedUntil = new ExecutionPhase[4];
+        TxnId[] txnIds = new TxnId[4];
+        int depth;
 
-        Keys someKeys = cur.someKeys();
-        if (someKeys == null)
-            someKeys = prev.savedDeps().someKeys(cur.txnId());
-        return new BlockedBy(cur, someKeys);
+        public NotifyWaitingOn(Command command)
+        {
+            txnIds[0] = command.txnId();
+            blockedUntil[0] = Done;
+        }
+
+        @Override
+        public void accept(SafeCommandStore safeStore)
+        {
+            Command prev = get(safeStore, depth - 1);
+            while (depth >= 0)
+            {
+                Command cur = safeStore.ifLoaded(txnIds[depth]);
+                ExecutionPhase until = blockedUntil[depth];
+                if (cur == null)
+                {
+                    // need to load; schedule execution for later
+                    safeStore.execute(this, this);
+                    return;
+                }
+
+                if (prev != null)
+                {
+                    if (cur.hasBeen(until) || (cur.hasBeen(Committed) && 
cur.executeAt().compareTo(prev.executeAt()) > 0))
+                    {
+                        prev.updatePredecessorAndMaybeExecute(safeStore, cur, 
false);
+                        --depth;
+                        prev = get(safeStore, depth - 1);
+                        continue;
+                    }
+                }
+                else if (cur.hasBeen(until))
+                {
+                    // we're done; have already applied
+                    Preconditions.checkState(depth == 0);
+                    break;
+                }
+
+                TxnId directlyBlockedBy = cur.firstWaitingOnCommit();
+                if (directlyBlockedBy != null)
+                {
+                    push(directlyBlockedBy, Decided);
+                }
+                else if (null != (directlyBlockedBy = 
cur.firstWaitingOnApply()))
+                {
+                    push(directlyBlockedBy, Done);
+                }
+                else
+                {
+                    if (cur.hasBeen(Committed) && !cur.hasBeen(ReadyToExecute) 
&& !cur.isUnableToExecute())
+                    {
+                        if (!cur.maybeExecute(safeStore, 
cur.progressShard(safeStore), false, false))
+                            throw new AssertionError("Is able to Apply, but 
has not done so");
+                        // loop and re-test the command's status; we may still 
want to notify blocking, esp. if not homeShard
+                        continue;
+                    }
+
+                    RoutingKeys someKeys = cur.someRoutingKeys();

Review Comment:
   When I use the verbiage `someKeys` I literally mean _any_ keys that we know 
are associated with the transaction. So, preferentially we use the keys we know 
of directly from the `Command` - but if we don't know any, we use those we 
reached it via from its successor transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to