ifesdjeen commented on code in PR #205: URL: https://github.com/apache/cassandra-accord/pull/205#discussion_r2163325664
########## accord-core/src/main/java/accord/coordinate/ExecuteFlag.java: ########## @@ -38,10 +43,107 @@ public static final class ExecuteFlags extends TinyEnumSet<ExecuteFlag> public static ExecuteFlags get(ExecuteFlag a) { return LOOKUP[encode(a)]; } public static ExecuteFlags get(ExecuteFlag a, ExecuteFlag b) { return LOOKUP[encode(a) | encode(b)]; } public ExecuteFlags with(ExecuteFlag a) { return LOOKUP[bitset | encode(a)]; } + public ExecuteFlags without(ExecuteFlag a) { return LOOKUP[bitset & ~encode(a)]; } public ExecuteFlags or(ExecuteFlags that) { return LOOKUP[this.bitset | that.bitset]; } public ExecuteFlags and(ExecuteFlags that) { return LOOKUP[this.bitset & that.bitset]; } public boolean isEmpty() { return bitset == 0; } public int bits() { return bitset; } private ExecuteFlags(int bits) { super(bits); } + + @Override + public String toString() + { + return toString(UNIVERSE); + } + } + + public interface CoordinationFlags + { + boolean isReadyToExecute(Node.Id node); + boolean hasUniqueHlc(); + void add(Node.Id node, ExecuteFlags flags); + + default ExecuteFlags get(Node.Id node) + { + ExecuteFlags result = ExecuteFlags.none(); + if (hasUniqueHlc()) result = result.with(HAS_UNIQUE_HLC); + if (isReadyToExecute(node)) result = result.with(READY_TO_EXECUTE); + return result; + } + + static CoordinationFlags none() + { + return ALWAYS_EMPTY; + } + + static CoordinationFlags empty(SortedList<Node.Id> list) + { + return list.size() <= 64 ? new SmallCoordinationFlags(list) : new LargeCoordinationFlags(list); + } + } + + private static final SmallCoordinationFlags ALWAYS_EMPTY = new SmallCoordinationFlags(new SortedArrays.SortedArrayList<>(new Node.Id[0])); + static + { + ALWAYS_EMPTY.hasUniqueHlc = false; + } + + static class SmallCoordinationFlags extends SortedListSet.SmallSortedListSet<Node.Id> implements CoordinationFlags + { + boolean hasUniqueHlc = true; + private SmallCoordinationFlags(SortedList<Node.Id> list) + { + super(list); + } + + @Override + public boolean isReadyToExecute(Node.Id node) + { + return contains(node); + } + + @Override + public boolean hasUniqueHlc() + { + return hasUniqueHlc; + } + + @Override + public void add(Node.Id node, ExecuteFlags flags) + { + if (hasUniqueHlc && !flags.contains(HAS_UNIQUE_HLC)) Review Comment: nit: ``` hasUniqueHlc &= flags.contains(HAS_UNIQUE_HLC); ``` ########## accord-core/src/main/java/accord/coordinate/ExecuteTxn.java: ########## @@ -262,28 +247,21 @@ public String toString() '}'; } - @Override - public void timeout() - { - onSlowResponse(node.id()); - localTimeout = null; - } - - @Override - public int stripe() + boolean mayFastExecute(ExecuteFlags flags) { - return txnId.hashCode(); + return flags.contains(READY_TO_EXECUTE) && (!txnId.hasPrivilegedCoordinator() || path != FAST) && fastReadsMayBypassSafeStore(txnId); Review Comment: I am very likely misunderstanding something but `(!txnId.hasPrivilegedCoordinator() || path != FAST`: if has no privileged coordinator, or path isn't fast, allow fast execute? ########## accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java: ########## @@ -111,11 +113,18 @@ void onPreAccepted(Topologies topologies, Timestamp executeAt, SortedListMap<Nod Deps deps = mergeFastOrMediumDeps(oks); if (deps != null) { - ExecuteFlags executeFlags = Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v : v.and(ok.flags), ExecuteFlags.all()); + CoordinationFlags executeFlags = oks.foldlNonNull((d, k, v, out) -> { Review Comment: Took me a bit to understand this; maybe this warrants a comment. Also, should we deduplicate this with `CoordinateEphemeralRead`, in case this logic gets changed in future? ########## accord-core/src/main/java/accord/local/Commands.java: ########## @@ -519,19 +527,55 @@ else if (command.hasBeen(PreCommitted) && !executeAt.equals(command.executeAt()) PartialDeps partialDeps = prepareDeps(validated, participants, command, deps); participants = prepareParticipants(validated, participants, command); - WaitingOn waitingOn = !command.hasBeen(Stable) ? initialiseWaitingOn(safeStore, txnId, executeAt, participants, partialDeps) - : command.asCommitted().waitingOn(); + WaitingOn waitingOn = newSaveStatus != SaveStatus.PreApplied + ? WaitingOn.none(txnId.domain(), partialDeps) + : command.hasBeen(Stable) + ? nonNull(command.asCommitted().waitingOn()) + : initialiseWaitingOn(safeStore, txnId, executeAt, participants, partialDeps); + + Invariants.require(newSaveStatus == SaveStatus.PreApplied || validateSafeToFastApply(safeStore, safeCommand)); Ballot promised = command.promised(); if (promised.compareTo(ballot) <= 0) promised = ballot; - safeCommand.preapplied(safeStore, participants, promised, executeAt, partialTxn, partialDeps, waitingOn, writes, result); - if (logger.isTraceEnabled()) - logger.trace("{}: apply, status set to Executed with executeAt: {}, deps: {}", txnId, executeAt, partialDeps); - // must signal preapplied first, else we may be applied (and have cleared progress log state) already before maybeExecute exits - maybeExecute(safeStore, safeCommand, true, true); - safeStore.agent().eventListener().onExecuted(command); + if (newSaveStatus == SaveStatus.PreApplied && !waitingOn.isWaiting()) + newSaveStatus = Applying; + if (newSaveStatus == Applying && (!txnId.is(Write) || writes == null || !writes.keys.intersects(participants.stillExecutes()))) + newSaveStatus = SaveStatus.Applied; + + switch (newSaveStatus) + { + default: throw UnhandledEnum.invalid(newSaveStatus); + case PreApplied: + { + Command.Executed executed = safeCommand.preapplied(safeStore, participants, ballot, executeAt, partialTxn, partialDeps, waitingOn, writes, result); + logger.trace("{}: preapplied", executed.txnId()); + // must signal preapplied first, else we may be applied (and have cleared progress log state) already before maybeExecute exits + safeStore.agent().eventListener().onPreApplied(executed); + maybeExecute(safeStore, safeCommand, true, true); + break; + } + case Applying: + { + Invariants.require(!waitingOn.isWaiting()); + Command.Executed executed = safeCommand.applying(safeStore, participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result); + safeStore.agent().eventListener().onPreApplied(executed); + safeStore.notifyListeners(safeCommand, command); + logger.trace("{}: applying", executed.txnId()); + applyChain(safeStore, executed).begin(safeStore.agent()); + break; + } + case Applied: + { + Command.Executed executed = safeCommand.applied(safeStore, participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result); + safeStore.agent().eventListener().onPreApplied(executed); + safeStore.agent().eventListener().onApplied(executed, -1); + safeStore.notifyListeners(safeCommand, command); Review Comment: nit: I would still add `break` here just for clarity / in case we move things around ########## accord-core/src/main/java/accord/local/Commands.java: ########## @@ -963,6 +1017,12 @@ private static Command purge(Command command, @Nonnull StoreParticipants newPart return result; } + private static boolean validateSafeToFastApply(SafeCommandStore safeStore, SafeCommand safeCommand) + { + // TODO (now): implement validation!! + return true; Review Comment: Just to make sure: did now mean in this patch or we're ok to leave it out for now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org