dcapwell commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1103088492


##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -123,55 +131,114 @@ protected void setState(State state)
         this.state = state;
     }
 
-    /**
-     * callback for loader and writer
-     */
-    @Override
-    public void accept(Object o, Throwable throwable)
+    private void callback(Object o, Throwable throwable)
     {
         if (throwable != null)
         {
             logger.error(String.format("Operation %s failed", this), 
throwable);
             state = State.FAILED;
-            tryFailure(throwable);
+            fail(throwable);
         }
         else
             run();
     }
 
+    private void finish(R result)
+    {
+        Preconditions.checkArgument(state == State.COMPLETING);
+        callback.accept(result, null);
+        state = State.FINISHED;
+    }
+
+    private void fail(Throwable throwable)
+    {
+        Preconditions.checkArgument(state != State.FINISHED && state != 
State.FAILED);
+        callback.accept(null, throwable);
+        state = State.FAILED;
+    }
+
+    private static PreExecuteContext preExecuteContext(PreLoadContext 
preLoadContext,
+                                                       
AccordStateCache.Instance<TxnId, Command> commandCache,
+                                                       
AccordStateCache.Instance<RoutableKey, CommandsForKey> cfkCache)
+    {
+        ImmutableMap<TxnId, Command> ctxCommands = 
commandCache.getActive(preLoadContext.txnIds());
+        ImmutableMap<RoutableKey, CommandsForKey> ctxCommandsForKey = 
cfkCache.getActive(toRoutableKeys(preLoadContext.keys()));
+        return new PreExecuteContext()
+        {
+            @Override
+            public ImmutableMap<TxnId, Command> commands()
+            {
+                return ctxCommands;
+            }
+
+            @Override
+            public ImmutableMap<RoutableKey, CommandsForKey> commandsForKey()
+            {
+                return ctxCommandsForKey;
+            }
+
+            @Override
+            public Iterable<TxnId> txnIds()
+            {
+                return preLoadContext.txnIds();
+            }
+
+            @Override
+            public Seekables<?, ?> keys()
+            {
+                return preLoadContext.keys();
+            }
+        };
+    }
+
+    private static <K, V extends ImmutableState> void 
releaseResources(AccordStateCache.Instance<K, V> cache,
+                                                                       
java.util.Map<K, ContextValue<V>> values)
+    {
+         values.forEach((key, value) -> {
+             cache.release(key, value.original(), value.current());
+         });
+    }
+
+    private void releaseResources()
+    {
+        releaseResources(commandStore.commandCache(), 
postExecuteContext.commands);
+        releaseResources(commandStore.commandsForKeyCache(), 
postExecuteContext.commandsForKey);
+    }
+
     protected void runInternal()
     {
-        SafeAccordCommandStore safeStore = commandStore.safeStore(context);
         switch (state)
         {
             case INITIALIZED:
                 state = State.LOADING;
             case LOADING:
-                if (!loader.load(context, this))
+                if (!loader.load(this::callback))
                     return;
 
                 state = State.RUNNING;
+                preExecuteContext = preExecuteContext(preLoadContext, 
commandStore.commandCache(), commandStore.commandsForKeyCache());
+                SafeCommandStore safeStore = 
commandStore.beginOperation(preExecuteContext);
                 result = apply(safeStore);
+                postExecuteContext = commandStore.completeOperation(safeStore);
 
                 state = State.SAVING;
             case SAVING:
             case AWAITING_SAVE:
-                boolean updatesPersisted = writer.save(context, this);
+                boolean updatesPersisted = writer.save(postExecuteContext, 
this::callback);
 
-                if (state != State.AWAITING_SAVE)
+                if (state == State.SAVING)
                 {
                     // with any updates on the way to disk, release resources 
so operations waiting
                     // to use these objects don't have issues with fields 
marked as unsaved
-                    context.releaseResources(commandStore);
+                    releaseResources();

Review Comment:
   I think you need to move this before `writer.save` as this does checks for 
correctness... for example if 2 different operations ran concurrently (not in 
parallel) if we ever had 1 operation not see the mutation of the other one 
(bug) we fail populating the cache, but we currently apply that mutation to 
disk; feel we should validate these assumptions before making them durable?
   
   Also this name is misleading, it's not "releasing" but "updating" the cache. 
 We do lower the ref count on the node, but we are also updating it... so maybe 
"updateAndDereferenceCache"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to