belliottsmith commented on code in PR #10:
URL: https://github.com/apache/cassandra-accord/pull/10#discussion_r994530753
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -380,119 +402,155 @@ public void onChange(Command command)
case Executed:
case Applied:
case Invalidated:
- if (waitingOnApply != null)
+ if (isUnableToApply())
{
updatePredecessor(command);
- if (waitingOnCommit != null)
+ if (isWaitingOnCommit())
{
- if (waitingOnCommit.remove(command.txnId) != null &&
waitingOnCommit.isEmpty())
- waitingOnCommit = null;
+ removeWaitingOnCommit(command);
}
- if (waitingOnCommit == null && waitingOnApply.isEmpty())
- waitingOnApply = null;
}
else
{
command.removeListener(this);
}
- maybeExecute(true);
+ maybeExecute(false);
break;
}
}
- private void maybeExecute(boolean notifyListeners)
+ @Override
+ public void onChange(Command command)
{
- if (status != Committed && status != Executed)
- return;
+ onChangeInternal(command);
+ }
- if (waitingOnApply != null)
+ protected void postApply()
+ {
+ logger.trace("{} applied, setting status to Applied and notifying
listeners", txnId());
+ status(Applied);
+ notifyListeners();
+ }
+
+ private static Function<CommandStore, Void> callPostApply(TxnId txnId)
+ {
+ return commandStore -> {
+ commandStore.command(txnId).postApply();
+ return null;
+ };
+ }
+
+ protected Future<Void> apply()
+ {
+ // important: we can't include a reference to *this* in the lambda,
since the C* implementation may evict
+ // the command instance from memory between now and the write
completing (and post apply being called)
+ return writes().apply(commandStore()).flatMap(unused ->
+ commandStore().process(this, callPostApply(txnId()))
+ );
+ }
+
+ public Future<Data> read(Keys scope)
+ {
+ return txn().read(this);
+ }
+
+ private Future<Void> maybeExecute(boolean notifyListenersOnNoop)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("{}: Maybe executing with status {}. Will notify
listeners on noop: {}", txnId(), status(), notifyListenersOnNoop);
+
+ if (status() != Committed && status() != Executed)
+ {
+ if (notifyListenersOnNoop) notifyListeners();
+ return Writes.SUCCESS;
+ }
+
+ if (isUnableToApply())
{
BlockedBy blockedBy = blockedBy();
if (blockedBy != null)
{
- commandStore.progressLog().waiting(blockedBy.txnId,
blockedBy.someKeys);
- return;
+ logger.trace("{}: not executing, blocked on {}", txnId(),
blockedBy.command.txnId());
+ commandStore().progressLog().waiting(blockedBy.command,
blockedBy.someKeys);
+ if (notifyListenersOnNoop) notifyListeners();
+ return Writes.SUCCESS;
}
- assert waitingOnApply == null;
+ assert !isWaitingOnApply();
}
- switch (status)
+ switch (status())
{
case Committed:
// TODO: maintain distinct ReadyToRead and ReadyToWrite states
- status = ReadyToExecute;
- boolean isProgressShard = progressKey != null &&
handles(txnId.epoch, progressKey);
- commandStore.progressLog().readyToExecute(txnId,
isProgressShard, isProgressShard && progressKey.equals(homeKey));
- if (notifyListeners)
- listeners.forEach(this);
+ status(ReadyToExecute);
+ logger.trace("{}: set to ReadyToExecute", txnId());
+ boolean isProgressShard = progressKey() != null &&
handles(txnId().epoch, progressKey());
+ commandStore().progressLog().readyToExecute(this,
isProgressShard, isProgressShard && progressKey().equals(homeKey()));
+ notifyListeners();
break;
case Executed:
- writes.apply(commandStore);
- status = Applied;
- if (notifyListeners)
- listeners.forEach(this);
+ logger.trace("{}: applying", txnId());
+ if (notifyListenersOnNoop) notifyListeners();
+ return apply();
}
+ return Writes.SUCCESS;
}
/**
* @param dependency is either committed or invalidated
*/
- private void updatePredecessor(Command dependency)
+ private void updatePredecessor(PartialCommand dependency)
{
Preconditions.checkState(dependency.hasBeen(Committed));
if (dependency.hasBeen(Invalidated))
{
+ logger.trace("{}: {} is invalidated. Stop listening and removing
from waiting on commit set.", txnId(), dependency.txnId());
dependency.removeListener(this);
- if (waitingOnCommit.remove(dependency.txnId) != null &&
waitingOnCommit.isEmpty())
- waitingOnCommit = null;
+ removeWaitingOnCommit(dependency);
}
- else if (dependency.executeAt.compareTo(executeAt) > 0)
+ else if (dependency.executeAt().compareTo(executeAt()) > 0)
{
// cannot be a predecessor if we execute later
+ logger.trace("{}: {} executes after us. Stop listening.", txnId(),
dependency.txnId());
dependency.removeListener(this);
}
else if (dependency.hasBeen(Applied))
{
- waitingOnApply.remove(dependency.executeAt);
+ logger.trace("{}: {} has been applied. Stop listening and removing
from waiting on apply set.", txnId(), dependency.txnId());
+ removeWaitingOnApply(dependency);
dependency.removeListener(this);
}
else
{
- waitingOnApply.putIfAbsent(dependency.executeAt, dependency);
+ logger.trace("{}: adding {} to waiting on apply set.", txnId(),
dependency.txnId());
+ addWaitingOnApplyIfAbsent(dependency);
}
}
// TEMPORARY: once we can invalidate commands that have not been witnessed
on any shard, we do not need to know the home shard
static class BlockedBy
{
- final TxnId txnId;
+ final PartialCommand command;
final Keys someKeys;
- BlockedBy(TxnId txnId, Keys someKeys)
+ BlockedBy(PartialCommand command, Keys someKeys)
{
- this.txnId = txnId;
+ this.command = command;
this.someKeys = someKeys;
}
}
public BlockedBy blockedBy()
{
Command prev = this;
- Command cur = directlyBlockedBy();
+ PartialCommand cur = directlyBlockedBy();
if (cur == null)
return null;
- Command next;
Review Comment:
Removing this loop breaks the progress log. I can see why it has been done,
but we'll have to do a bit more work here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]