dcapwell commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1103076257
##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -123,55 +131,114 @@ protected void setState(State state)
this.state = state;
}
- /**
- * callback for loader and writer
- */
- @Override
- public void accept(Object o, Throwable throwable)
+ private void callback(Object o, Throwable throwable)
{
if (throwable != null)
{
logger.error(String.format("Operation %s failed", this),
throwable);
state = State.FAILED;
- tryFailure(throwable);
+ fail(throwable);
}
else
run();
}
+ private void finish(R result)
+ {
+ Preconditions.checkArgument(state == State.COMPLETING);
+ callback.accept(result, null);
+ state = State.FINISHED;
+ }
+
+ private void fail(Throwable throwable)
+ {
+ Preconditions.checkArgument(state != State.FINISHED && state !=
State.FAILED);
+ callback.accept(null, throwable);
+ state = State.FAILED;
+ }
+
+ private static PreExecuteContext preExecuteContext(PreLoadContext
preLoadContext,
+
AccordStateCache.Instance<TxnId, Command> commandCache,
+
AccordStateCache.Instance<RoutableKey, CommandsForKey> cfkCache)
+ {
+ ImmutableMap<TxnId, Command> ctxCommands =
commandCache.getActive(preLoadContext.txnIds());
+ ImmutableMap<RoutableKey, CommandsForKey> ctxCommandsForKey =
cfkCache.getActive(toRoutableKeys(preLoadContext.keys()));
+ return new PreExecuteContext()
+ {
+ @Override
+ public ImmutableMap<TxnId, Command> commands()
+ {
+ return ctxCommands;
+ }
+
+ @Override
+ public ImmutableMap<RoutableKey, CommandsForKey> commandsForKey()
+ {
+ return ctxCommandsForKey;
+ }
+
+ @Override
+ public Iterable<TxnId> txnIds()
+ {
+ return preLoadContext.txnIds();
+ }
+
+ @Override
+ public Seekables<?, ?> keys()
+ {
+ return preLoadContext.keys();
+ }
+ };
+ }
+
+ private static <K, V extends ImmutableState> void
releaseResources(AccordStateCache.Instance<K, V> cache,
+
java.util.Map<K, ContextValue<V>> values)
+ {
+ values.forEach((key, value) -> {
+ cache.release(key, value.original(), value.current());
+ });
+ }
+
+ private void releaseResources()
+ {
+ releaseResources(commandStore.commandCache(),
postExecuteContext.commands);
+ releaseResources(commandStore.commandsForKeyCache(),
postExecuteContext.commandsForKey);
+ }
+
protected void runInternal()
{
- SafeAccordCommandStore safeStore = commandStore.safeStore(context);
switch (state)
{
case INITIALIZED:
state = State.LOADING;
case LOADING:
- if (!loader.load(context, this))
+ if (!loader.load(this::callback))
Review Comment:
If reading this correctly don't we have the following timing issue?
```
O1: load(tx1)
O2: load(tx1)
O1: load completes
O1: runs
O1: save the mutation of tx1 back
O2: load completes
O2: runs
O2 save the mutation of tx1 back
```
where do we make sure it see's O1's mutation?
Looking at
`org.apache.cassandra.service.accord.AccordStateCache.Instance#release` I see
we try to guard against it with the following line
```
Invariants.checkState(node.value() == original ||
(accessor.isEmpty(node.value()) && original == null));
```
but this check *happens-before* we schedule the write to disk!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]