belliottsmith commented on code in PR #50:
URL: https://github.com/apache/cassandra-accord/pull/50#discussion_r1254355652


##########
accord-core/src/main/java/accord/local/Commands.java:
##########
@@ -610,247 +665,236 @@ private static boolean maybeExecute(SafeCommandStore 
safeStore, SafeCommand safe
         }
     }
 
-    protected static WaitingOn populateWaitingOn(SafeCommandStore safeStore, 
TxnId txnId, Timestamp executeAt, PartialDeps partialDeps)
+    protected static WaitingOn initialiseWaitingOn(SafeCommandStore safeStore, 
TxnId waitingId, Timestamp executeWaitingAt, PartialDeps partialDeps, Route<?> 
route)
     {
-        Ranges ranges = applyRanges(safeStore, executeAt);
-        if (ranges.isEmpty())
-            return WaitingOn.EMPTY;
-
-        return populateWaitingOn(safeStore, ranges, txnId, executeAt, 
partialDeps);
+        Unseekables<?> executionParticipants = 
route.participants().slice(safeStore.ranges().allAt(executeWaitingAt));
+        WaitingOn.Update update = new WaitingOn.Update(executionParticipants, 
partialDeps);
+        return updateWaitingOn(safeStore, waitingId, executeWaitingAt, update, 
route.participants()).build();
     }
 
-    protected static <P> void visitWaitingOn(SafeCommandStore safeStore, TxnId 
txnId, Timestamp waitUntil, Deps waitOn, Timestamp executeAt, 
WaitingOnVisitor<P> visitor, P param)
+    protected static WaitingOn.Update updateWaitingOn(SafeCommandStore 
safeStore, TxnId txnId, Timestamp executeAt, WaitingOn.Update update, 
Participants<?> participants)
     {
-        Ranges ranges = applyRanges(safeStore, executeAt);
-        if (ranges.isEmpty())
-            return;
+        CommandStore commandStore = safeStore.commandStore();
+        TxnId minWaitingOnTxnId = update.minWaitingOnTxnId();
+        if (minWaitingOnTxnId != null && 
commandStore.hasRedundantDependencies(update.minWaitingOnTxnId(), executeAt, 
participants))
+            safeStore.commandStore().removeRedundantDependencies(participants, 
update);
+
+        update.forEachWaitingOnCommit(safeStore, update, txnId, executeAt, 
(store, upd, id, exec, i) -> {
+            // TODO (expected): load read-only to reduce overhead; upgrade 
only if we need to remove listener
+            SafeCommand dep = store.ifLoadedAndInitialised(upd.deps.txnId(i));
+            if (dep == null || !dep.current().hasBeen(PreCommitted))
+                return;
+            updateWaitingOn(id, exec, upd, dep);
+        });
 
-        visitWaitingOn(safeStore, ranges, txnId, waitUntil, waitOn, visitor, 
param);
-    }
+        update.forEachWaitingOnApply(safeStore, update, txnId, executeAt, 
(store, upd, id, exec, i) -> {
+            SafeCommand dep = store.ifLoadedAndInitialised(upd.deps.txnId(i));
+            if (dep == null || !dep.current().hasBeen(PreCommitted))
+                return;
+            updateWaitingOn(id, exec, upd, dep);
+        });
 
-    protected static WaitingOn populateWaitingOn(SafeCommandStore safeStore, 
Ranges ranges, TxnId waitingId, Timestamp executeAt, PartialDeps partialDeps)
-    {
-        WaitingOn.Update update = new WaitingOn.Update();
-        visitWaitingOn(safeStore, ranges, waitingId, executeAt, partialDeps, 
Commands::populateWaitingOn, update);
-        return update.build();
+        return update;
     }
 
-    public interface WaitingOnVisitor<P>
-    {
-        void visit(SafeCommandStore safeStore, TxnId waitingId, Timestamp 
executeAt, TxnId dependencyId, P param);
-    }
-
-    protected static <P> void visitWaitingOn(SafeCommandStore safeStore, 
Ranges ranges, TxnId waitingId, Timestamp waitUntil, Deps deps, 
WaitingOnVisitor<P> visitor, P param)
+    /**
+     * @param dependencySafeCommand is either committed truncated, or 
invalidated
+     * @return true iff {@code maybeExecute} might now have a different outcome
+     */
+    private static boolean updateWaitingOn(TxnId waitingId, Timestamp 
executeWaitingAt, WaitingOn.Update waitingOn, SafeCommand dependencySafeCommand)
     {
-        boolean isAffectedByBootstrap = 
safeStore.commandStore().isAffectedByBootstrap(deps);
-        if (!isAffectedByBootstrap)
+        Command dependency = dependencySafeCommand.current();
+        Invariants.checkState(dependency.hasBeen(PreCommitted));
+        if (dependency.is(Truncated))
         {
-            deps.forEachUniqueTxnId(ranges, dependencyId -> 
visitor.visit(safeStore, waitingId, waitUntil, dependencyId, param));
-            return;
+            logger.trace("{}: {} is truncated. Stop listening and removing 
from waiting on commit set.", waitingId, dependency.txnId());
+            dependencySafeCommand.removeListener(new ProxyListener(waitingId));
+            return waitingOn.removeInvalidatedOrTruncated(dependency.txnId());
         }
-
-        BitSet bits = new BitSet(Math.max(deps.keyDeps.txnIdCount(), 
deps.rangeDeps.txnIdCount()));
-        if (!deps.keyDeps.isEmpty())
+        else if (dependency.is(Invalidated))
         {
-            SortedArrays.SortedArrayList<TxnId> dependencyIds = 
deps.keyDeps.txnIds();
-            // process each interval of TxnId we have a different incomplete 
range for in a batch,
-            safeStore.commandStore().forEachBootstrapRange(dependencyIds, 
(kdeps, bootstrapId, rs, start, end) -> {
-                // We do not need to depend on the bootstrap transaction for 
writes, as we have a timestamp store
-                // (so any write we perform will persist past the bootstrap 
completing).
-                // For most reads we can rely on safeToRead, but this is not 
the case for reads that depend on
-                // writes that started before our ExclusiveSyncPoint but 
execute afterwards.
-                // We must retain a dependency on these transactions.
-
-                rs = ranges.slice(rs, Minimal);
-                kdeps.forEach(rs, (bs, txnId, i) -> {
-                    if (i < end && i >= start)
-                        bs.set(i);
-                }, bits);
-            }, deps.keyDeps);
-            int i = -1;
-            while ((i = bits.nextSetBit(i + 1)) >= 0)
-                visitor.visit(safeStore, waitingId, waitUntil, 
dependencyIds.get(i), param);
+            logger.trace("{}: {} is invalidated. Stop listening and removing 
from waiting on commit set.", waitingId, dependency.txnId());
+            dependencySafeCommand.removeListener(new ProxyListener(waitingId));
+            return waitingOn.removeInvalidatedOrTruncated(dependency.txnId());
         }
-
-        if (!deps.rangeDeps.isEmpty())
+        else if (dependency.executeAt().compareTo(executeWaitingAt) > 0 && 
!waitingId.rw().awaitsFutureDeps())
         {
-            bits.clear();
-            SortedArrays.SortedArrayList<TxnId> dependencyIds = 
deps.rangeDeps.txnIds();
-            safeStore.commandStore().forEachBootstrapRange(dependencyIds, 
(rdeps, bootstrapId, rs, start, end) -> {
-                rs = ranges.slice(rs, Minimal);
-                rdeps.forEach(rs, (bs, i) -> {
-                    if (i >= start && i < end)
-                        bs.set(i);
-                }, bits);
-            }, deps.rangeDeps);
-            int i = -1;
-            while ((i = bits.nextSetBit(i + 1)) >= 0)
-                visitor.visit(safeStore, waitingId, waitUntil, 
dependencyIds.get(i), param);
+            // dependency cannot be a predecessor if it executes later
+            logger.trace("{}: {} executes after us. Stop listening and 
removing from waiting on apply set.", waitingId, dependency.txnId());
+            dependencySafeCommand.removeListener(new ProxyListener(waitingId));
+            return waitingOn.removeWaitingOn(dependency.txnId());
         }
-    }
-
-    @Inline
-    private static void populateWaitingOn(SafeCommandStore safeStore, TxnId 
waitingId, Timestamp executeAt, TxnId dependencyId, WaitingOn.Update update)
-    {
-        SafeCommand dependencySafeCommand = safeStore.ifLoaded(dependencyId);
-        if (dependencySafeCommand == null)
+        else if (dependency.hasBeen(Applied))
         {
-            update.addWaitingOnCommit(dependencyId);
-            safeStore.addAndInvokeListener(dependencyId, waitingId);
+            logger.trace("{}: {} has been applied. Stop listening and removing 
from waiting on apply set.", waitingId, dependency.txnId());
+            dependencySafeCommand.removeListener(new ProxyListener(waitingId));
+            return waitingOn.removeApplied(dependency.txnId(), 
dependency.asCommitted().waitingOn());
         }
-        else
+        else if (!waitingOn.isEmpty())
         {
-            Command command = dependencySafeCommand.current();
-            switch (command.status())
+            logger.trace("{}: adding {} to waiting on apply set.", waitingId, 
dependency.txnId());
+            if (waitingOn.addWaitingOnApply(dependency.txnId()))
             {
-                default:
-                    throw new IllegalStateException();
-                case NotWitnessed:
-                case PreAccepted:
-                case Accepted:
-                case AcceptedInvalidate:
-                case PreCommitted:
-                    // we don't know when these dependencies will execute, and 
cannot execute until we do
-
-                    command = dependencySafeCommand.addListener(new 
Command.ProxyListener(waitingId));
-                    update.addWaitingOnCommit(command.txnId());
-                    break;
-                case Committed:
-                    // TODO (desired, efficiency): split into ReadyToRead and 
ReadyToWrite;
-                    //                             the distributed read can be 
performed as soon as those keys are ready,
-                    //                             and in parallel with any 
other reads. the client can even ACK immediately after;
-                    //                             only the write needs to be 
postponed until other in-progress reads complete
-                case ReadyToExecute:
-                case PreApplied:
-                    command = dependencySafeCommand.addListener(new 
Command.ProxyListener(waitingId));
-                    insertWaitingOn(waitingId, executeAt, update, command);
-                case Applied:
-                case Invalidated:
-                    break;
+                boolean removedWaitingOnCommit = 
waitingOn.removeWaitingOnCommit(dependency.txnId());
+                Invariants.checkState(removedWaitingOnCommit);
+                return true;
             }
+            return false;
+        }
+        else
+        {
+            throw new IllegalStateException("We have a dependency to wait on, 
but have already finished waiting");
         }
     }
 
-    private static void insertWaitingOn(TxnId txnId, Timestamp executeAt, 
WaitingOn.Update update, Command dependencyId)
+    static void updateDependencyAndMaybeExecute(SafeCommandStore safeStore, 
SafeCommand safeCommand, SafeCommand predecessor, boolean notifyWaitingOn)
     {
-        Invariants.checkState(dependencyId.hasBeen(Committed));
-        if (dependencyId.hasBeen(Invalidated))
-        {
-            logger.trace("{}: {} is invalidated. Do not insert.", txnId, 
dependencyId.txnId());
-        }
-        else if (dependencyId.executeAt().compareTo(executeAt) > 0)
-        {
-            // dependency cannot be a predecessor if it executes later
-            logger.trace("{}: {} executes after us. Do not insert.", txnId, 
dependencyId.txnId());
-        }
-        else if (dependencyId.hasBeen(Applied))
+        Command.Committed command = safeCommand.current().asCommitted();
+        if (command.hasBeen(Applied))
+            return;
+
+        WaitingOn.Update waitingOn = new WaitingOn.Update(command);
+        if (updateWaitingOn(command.txnId(), command.executeAt(), waitingOn, 
predecessor))
         {
-            logger.trace("{}: {} has been applied. Do not insert.", txnId, 
dependencyId.txnId());
+            command = safeCommand.updateWaitingOn(waitingOn);
+            maybeExecute(safeStore, safeCommand, command.progressShard(), 
false, notifyWaitingOn);
         }
         else
         {
-            logger.trace("{}: adding {} to waiting on apply set.", txnId, 
dependencyId.txnId());
-            update.addWaitingOnApply(dependencyId.txnId(), 
dependencyId.executeAt());
+            Command pred = predecessor.current();
+            if (pred.is(ReadyToExecute))
+            {
+                TxnId nextWaitingOn = command.waitingOn().nextWaitingOn();
+                if (nextWaitingOn != null && 
nextWaitingOn.equals(pred.txnId()))
+                    safeStore.progressLog().waiting(predecessor, Known.Done, 
pred.route(), null);
+            }
         }
     }
 
-    /**
-     * @param safeDependency is either committed or invalidated
-     * @return true iff {@code maybeExecute} might now have a different outcome
-     */
-    private static boolean updateWaitingOn(SafeCommand dependencySafeCommand, 
WaitingOn.Update waitingOn, SafeCommand safeDependency)
+    public enum Truncate { NO, TRUNCATE, ERASE }
+
+    public static Command setTruncated(SafeCommandStore safeStore, SafeCommand 
safeCommand)
     {
-        Command.Committed command = 
dependencySafeCommand.current().asCommitted();
-        Command dependency = safeDependency.current();
-        Invariants.checkState(dependency.hasBeen(PreCommitted));
-        if (dependency.hasBeen(Invalidated))
-        {
-            logger.trace("{}: {} is invalidated. Stop listening and removing 
from waiting on commit set.", command.txnId(), dependency.txnId());
-            safeDependency.removeListener(command.asListener());
-            waitingOn.removeWaitingOnCommit(dependency.txnId());
-            return true;
-        }
-        else if (dependency.executeAt().compareTo(command.executeAt()) > 0)
-        {
-            // dependency cannot be a predecessor if it executes later
-            logger.trace("{}: {} executes after us. Stop listening and 
removing from waiting on apply set.", command.txnId(), dependency.txnId());
-            waitingOn.removeWaitingOn(dependency.txnId(), 
dependency.executeAt());
-            safeDependency.removeListener(command.asListener());
-            return true;
-        }
-        else if (dependency.hasBeen(Applied))
-        {
-            logger.trace("{}: {} has been applied. Stop listening and removing 
from waiting on apply set.", command.txnId(), dependency.txnId());
-            waitingOn.removeWaitingOn(dependency.txnId(), 
dependency.executeAt());
-            safeDependency.removeListener(command.asListener());
-            return true;
-        }
-        else if (command.isWaitingOnDependency())
-        {
-            logger.trace("{}: adding {} to waiting on apply set.", 
command.txnId(), dependency.txnId());
-            waitingOn.addWaitingOnApply(dependency.txnId(), 
dependency.executeAt());
-            waitingOn.removeWaitingOnCommit(dependency.txnId());
-            return false;
-        }
-        else
+        return setTruncated(safeStore, safeCommand, Truncate.TRUNCATE, true);
+    }
+
+    public static Command setTruncated(SafeCommandStore safeStore, SafeCommand 
safeCommand, Truncate truncate, boolean notifyListeners)
+    {
+        Command command = safeCommand.current();
+        Invariants.checkState(!command.is(Truncated));
+
+        //   1) a command has been applied; or
+        //   2) has been coordinated but *will not* be applied (we just 
haven't witnessed the invalidation yet); or
+        //   3) a command is durably decided and this shard only hosts its 
home data, so no explicit truncation is necessary to remove it
+        // TODO (desired): consider if there are better invariants we can 
impose for undecided transactions, to verify they aren't later committed 
(should be detected already, but more is better)
+        Invariants.checkState(command.hasBeen(Applied) || 
!command.hasBeen(PreCommitted) || command.partialTxn().keys().isEmpty());
+
+        Command.Truncated result = truncated(command);
+        safeCommand.set(result);
+        safeStore.progressLog().clear(safeCommand.txnId());
+        if (notifyListeners)
+            safeStore.notifyListeners(safeCommand);
+
+        if (truncate == Truncate.ERASE)
+            safeStore.erase(safeCommand);
+        return result;
+    }
+
+    public static Truncate shouldTruncate(SafeCommandStore safeStore, Command 
command)
+    {
+        if (safeStore.commandStore().globalDurability(command.txnId()) == 
Universal)
+            return Truncate.ERASE;
+
+        if (!command.hasBeen(Applied) || !command.known().hasCompleteRoute())
+            return Truncate.NO;
+
+        TxnId txnId = command.txnId();
+        Timestamp executeAt = command.executeAt() != null && 
!command.executeAt().equals(Timestamp.NONE) ? command.executeAt() : txnId;

Review Comment:
   They're two different things you reference, but yes, we can name them in 
Command to make the distinction and call-sites clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to