belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1006244047
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -402,155 +518,250 @@ private void onChangeInternal(PartialCommand command)
case Executed:
case Applied:
case Invalidated:
- if (isUnableToApply())
- {
- updatePredecessor(command);
- if (isWaitingOnCommit())
- {
- removeWaitingOnCommit(command);
- }
- }
- else
- {
- command.removeListener(this);
- }
- maybeExecute(false);
+ updatePredecessor(command, false);
+ maybeExecute(safeStore, progressShard(safeStore), false, true);
break;
}
}
- @Override
- public void onChange(Command command)
- {
- onChangeInternal(command);
- }
-
- protected void postApply()
+ protected void postApply(SafeCommandStore safeStore)
{
logger.trace("{} applied, setting status to Applied and notifying
listeners", txnId());
- status(Applied);
- notifyListeners();
+ setStatus(Applied);
+ notifyListeners(safeStore);
}
- private static Function<CommandStore, Void> callPostApply(TxnId txnId)
+ private static Function<SafeCommandStore, Void> callPostApply(TxnId txnId)
{
- return commandStore -> {
- commandStore.command(txnId).postApply();
+ return safeStore -> {
+ safeStore.command(txnId).postApply(safeStore);
return null;
};
}
- protected Future<Void> apply()
+ protected Future<Void> apply(SafeCommandStore safeStore)
{
// important: we can't include a reference to *this* in the lambda,
since the C* implementation may evict
// the command instance from memory between now and the write
completing (and post apply being called)
- return writes().apply(commandStore()).flatMap(unused ->
- commandStore().process(this, callPostApply(txnId()))
+ CommandStore unsafeStore = safeStore.commandStore();
+ return writes().apply(safeStore).flatMap(unused ->
+ unsafeStore.submit(this, callPostApply(txnId()))
);
}
- public Future<Data> read(Keys scope)
+ public Future<Data> read(SafeCommandStore safeStore)
{
- return txn().read(this);
+ return partialTxn().read(safeStore, this);
}
- private Future<Void> maybeExecute(boolean notifyListenersOnNoop)
+ // TODO: maybe split into maybeExecute and maybeApply?
+ private boolean maybeExecute(SafeCommandStore safeStore, ProgressShard
shard, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
{
if (logger.isTraceEnabled())
- logger.trace("{}: Maybe executing with status {}. Will notify
listeners on noop: {}", txnId(), status(), notifyListenersOnNoop);
+ logger.trace("{}: Maybe executing with status {}. Will notify
listeners on noop: {}", txnId(), status(), alwaysNotifyListeners);
if (status() != Committed && status() != Executed)
{
- if (notifyListenersOnNoop) notifyListeners();
- return Writes.SUCCESS;
+ if (alwaysNotifyListeners)
+ notifyListeners(safeStore);
+ return false;
}
- if (isUnableToApply())
+ if (isUnableToExecute())
{
- BlockedBy blockedBy = blockedBy();
- if (blockedBy != null)
- {
- logger.trace("{}: not executing, blocked on {}", txnId(),
blockedBy.command.txnId());
- commandStore().progressLog().waiting(blockedBy.command,
blockedBy.someKeys);
- if (notifyListenersOnNoop) notifyListeners();
- return Writes.SUCCESS;
- }
- assert !isWaitingOnApply();
+ if (alwaysNotifyListeners)
+ notifyListeners(safeStore);
+
+ if (notifyWaitingOn)
+ new NotifyWaitingOn(this).accept(safeStore);
+ return false;
}
switch (status())
{
case Committed:
// TODO: maintain distinct ReadyToRead and ReadyToWrite states
- status(ReadyToExecute);
+ setStatus(ReadyToExecute);
logger.trace("{}: set to ReadyToExecute", txnId());
- boolean isProgressShard = progressKey() != null &&
handles(txnId().epoch, progressKey());
- commandStore().progressLog().readyToExecute(this,
isProgressShard, isProgressShard && progressKey().equals(homeKey()));
- notifyListeners();
+ safeStore.progressLog().readyToExecute(this, shard);
+ notifyListeners(safeStore);
break;
+
case Executed:
- logger.trace("{}: applying", txnId());
- if (notifyListenersOnNoop) notifyListeners();
- return apply();
+ if (executeRanges(safeStore,
executeAt()).intersects(writes().keys,
safeStore.commandStore()::hashIntersects))
+ {
+ logger.trace("{}: applying", txnId());
+ apply(safeStore);
Review Comment:
The coordinator can be acknowledged as soon as the outcome is durable, ie
once we enter the Executed state. The work of applying that to the underlying
data store is a local concern, and should only affect when later transactions
can execute locally. If we were to wait until we actually apply we might be
waiting a long time, as this can happen even before earlier transactions have
actually executed - if our outcome is unconditional or does not depend on this
shard’s state, we may have queued transactions that are decided and Executed
before those that will apply first.
I have discussed with Ariel that the Executed state is a bit poorly named
and we should perhaps switch to something like PreApplied, to convey that we
have recorded the necessary information in order to Apply and will do so when
our turn comes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]