belliottsmith commented on code in PR #226:
URL: https://github.com/apache/cassandra-accord/pull/226#discussion_r2222725273


##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -1128,88 +1197,75 @@ public static void logDependencyGraph(CommandStore 
commandStore, TxnId txnId)
     // redundantBefore, durableBefore, newBootstrapBeganAt, safeToRead, 
rangesForEpoch are
     // not replayed here. It is assumed that persistence on the application 
side will ensure
     // they are brought up to latest values _before_ replay.
-    public void clearForTesting()
+    public void clear()
     {
         Invariants.require(current == null);
         progressLog.clear();
         commands.clear();
         commandsByExecuteAt.clear();
         commandsForKey.clear();
         rangeCommands.clear();
+        listeners.clear();
         unsafeSetRejectBefore(new RejectBefore());
     }
 
-    public Journal.Loader loader()
+    public Journal.Replayer replayer()
     {
-        return new CommandLoader(this);
+        return new CommandReplayer(this);
     }
 
-    private static class CommandLoader extends AbstractLoader
+    private static class CommandReplayer extends AbstractReplayer
     {
         private final InMemoryCommandStore commandStore;
 
-        private CommandLoader(InMemoryCommandStore commandStore)
+        private CommandReplayer(InMemoryCommandStore commandStore)
         {
+            super(commandStore.unsafeGetRedundantBefore());
             this.commandStore = commandStore;
         }
 
-        private PreLoadContext context(Command command, LoadKeys loadKeys)
-        {
-            TxnId txnId = command.txnId();
-            AbstractUnseekableKeys keys = null;
-
-            if (CommandsForKey.manages(txnId))
-                keys = (AbstractUnseekableKeys) 
command.participants().hasTouched();
-            else if (!CommandsForKey.managesExecution(txnId) && 
command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated))
-                keys = command.asCommitted().waitingOn.keys;
-
-            if (keys != null)
-                return PreLoadContext.contextFor(txnId, keys, loadKeys, WRITE, 
"Replay");
-
-            return PreLoadContext.contextFor(txnId, "Replay");
-        }
-
-        protected TxnId loadInternal(Command command, SafeCommandStore 
safeStore)
-        {
-            TxnId txnId = command.txnId();
-            Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command, 
command.participants());
-            if (cleanup != Cleanup.NO)
-                command = Commands.purge(safeStore, command, cleanup);
-
-            safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
-            return txnId;
-        }
-
-        private AsyncChain<TxnId> load(Command command)
+        private AsyncChain<Void> apply(Command command)
         {
             return 
AsyncChains.success(commandStore.executeInContext(commandStore,
-                                                                     
context(command, ASYNC),
-                                                                     
(SafeCommandStore safeStore) -> loadInternal(command, safeStore)));
-        }
-
-        private AsyncChain<Void> apply(TxnId txnId)
-        {
-            return 
AsyncChains.success(commandStore.executeInContext(commandStore,
-                                                                     
PreLoadContext.contextFor(txnId, "Replay"),
+                                                                     
PreLoadContext.contextFor(command.txnId(), "Replay"),
                                                                      
(SafeCommandStore safeStore) -> {
-                                                                         
initialiseState(safeStore, txnId);
+                                                                         
initialiseState(safeStore, command.txnId());
                                                                          
return null;
                                                                      }));
         }
 
         @Override
-        public AsyncChain<Void> load(TxnId txnId)
+        public AsyncChain<Void> replay(TxnId txnId)
         {
             // TODO (required): consider this race condition some more:
             //      - can we avoid double-applying?
             //      - is this definitely safe?
+            Command command = null;
             if (commandStore.hasCommand(txnId))
-                return apply(txnId);
+                command = commandStore.commands.get(txnId).value();
 
-            Command command = 
commandStore.journal.loadCommand(commandStore.id, txnId, 
commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
             if (command == null)
+            {
+                command = commandStore.journal.loadCommand(commandStore.id, 
txnId, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
+                if (command != null)
+                {
+                    Cleanup cleanup = Cleanup.shouldCleanup(FULL, command, 
commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
+                    if (cleanup != Cleanup.NO)
+                        command = Commands.purgeUnsafe(commandStore, command, 
cleanup);
+
+                    // initialise basic state, but don't call safeStore.update 
so we don't initialise listeners etc
+                    GlobalCommand global = 
commandStore.commands.computeIfAbsent(txnId, GlobalCommand::new);
+                    global.value(command);
+                    Timestamp executeAt = command.executeAtIfKnown();
+                    if (executeAt != null)
+                        commandStore.commandsByExecuteAt.put(executeAt, 
global);
+                }
+            }
+
+            if (command == null || !maybeShouldReplay(txnId) || 
!shouldReplay(txnId, command.participants()))

Review Comment:
   This is emulating the behaviour in C* - we don't read from the journal in 
active processing with InMemoryCommandStore, so we have to restore the state of 
every transaction from the journal into memory. Some of these could be filtered 
here, but this is not how C* journal works, we would filter on load. 
   
   We then filter all of the later steps, which is effectively not calling 
apply when appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to