belliottsmith commented on code in PR #226: URL: https://github.com/apache/cassandra-accord/pull/226#discussion_r2222725273
########## accord-core/src/main/java/accord/impl/InMemoryCommandStore.java: ########## @@ -1128,88 +1197,75 @@ public static void logDependencyGraph(CommandStore commandStore, TxnId txnId) // redundantBefore, durableBefore, newBootstrapBeganAt, safeToRead, rangesForEpoch are // not replayed here. It is assumed that persistence on the application side will ensure // they are brought up to latest values _before_ replay. - public void clearForTesting() + public void clear() { Invariants.require(current == null); progressLog.clear(); commands.clear(); commandsByExecuteAt.clear(); commandsForKey.clear(); rangeCommands.clear(); + listeners.clear(); unsafeSetRejectBefore(new RejectBefore()); } - public Journal.Loader loader() + public Journal.Replayer replayer() { - return new CommandLoader(this); + return new CommandReplayer(this); } - private static class CommandLoader extends AbstractLoader + private static class CommandReplayer extends AbstractReplayer { private final InMemoryCommandStore commandStore; - private CommandLoader(InMemoryCommandStore commandStore) + private CommandReplayer(InMemoryCommandStore commandStore) { + super(commandStore.unsafeGetRedundantBefore()); this.commandStore = commandStore; } - private PreLoadContext context(Command command, LoadKeys loadKeys) - { - TxnId txnId = command.txnId(); - AbstractUnseekableKeys keys = null; - - if (CommandsForKey.manages(txnId)) - keys = (AbstractUnseekableKeys) command.participants().hasTouched(); - else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated)) - keys = command.asCommitted().waitingOn.keys; - - if (keys != null) - return PreLoadContext.contextFor(txnId, keys, loadKeys, WRITE, "Replay"); - - return PreLoadContext.contextFor(txnId, "Replay"); - } - - protected TxnId loadInternal(Command command, SafeCommandStore safeStore) - { - TxnId txnId = command.txnId(); - Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command, command.participants()); - if (cleanup != Cleanup.NO) - command = Commands.purge(safeStore, command, cleanup); - - safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command); - return txnId; - } - - private AsyncChain<TxnId> load(Command command) + private AsyncChain<Void> apply(Command command) { return AsyncChains.success(commandStore.executeInContext(commandStore, - context(command, ASYNC), - (SafeCommandStore safeStore) -> loadInternal(command, safeStore))); - } - - private AsyncChain<Void> apply(TxnId txnId) - { - return AsyncChains.success(commandStore.executeInContext(commandStore, - PreLoadContext.contextFor(txnId, "Replay"), + PreLoadContext.contextFor(command.txnId(), "Replay"), (SafeCommandStore safeStore) -> { - initialiseState(safeStore, txnId); + initialiseState(safeStore, command.txnId()); return null; })); } @Override - public AsyncChain<Void> load(TxnId txnId) + public AsyncChain<Void> replay(TxnId txnId) { // TODO (required): consider this race condition some more: // - can we avoid double-applying? // - is this definitely safe? + Command command = null; if (commandStore.hasCommand(txnId)) - return apply(txnId); + command = commandStore.commands.get(txnId).value(); - Command command = commandStore.journal.loadCommand(commandStore.id, txnId, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore()); if (command == null) + { + command = commandStore.journal.loadCommand(commandStore.id, txnId, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore()); + if (command != null) + { + Cleanup cleanup = Cleanup.shouldCleanup(FULL, command, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore()); + if (cleanup != Cleanup.NO) + command = Commands.purgeUnsafe(commandStore, command, cleanup); + + // initialise basic state, but don't call safeStore.update so we don't initialise listeners etc + GlobalCommand global = commandStore.commands.computeIfAbsent(txnId, GlobalCommand::new); + global.value(command); + Timestamp executeAt = command.executeAtIfKnown(); + if (executeAt != null) + commandStore.commandsByExecuteAt.put(executeAt, global); + } + } + + if (command == null || !maybeShouldReplay(txnId) || !shouldReplay(txnId, command.participants())) Review Comment: This is emulating the behaviour in C* - we don't read from the journal in active processing with InMemoryCommandStore, so we have to restore the state of every transaction from the journal into memory. Some of these could be filtered here, but this is not how C* journal works, we would filter on load. We then filter all of the later steps, which is effectively not calling apply when appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org