belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1011779449
##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -101,332 +90,343 @@ static Progress advance(Progress current)
}
}
- static class GlobalPendingDurable
+ // exists only on home shard
+ static class CoordinateState
{
- final Set<Id> persistedOn;
-
- GlobalPendingDurable(Set<Id> persistedOn)
+ enum CoordinateStatus
{
- this.persistedOn = persistedOn;
- }
- }
-
- static class HomeState
- {
- enum LocalStatus
- {
- NotWitnessed, Uncommitted, Committed, ReadyToExecute, Done;
- boolean isAtMost(LocalStatus equalOrLessThan)
+ NotWitnessed, Uncommitted, Committed, ReadyToExecute,
LocallyDurable, Done;
+ boolean isAtMost(CoordinateStatus equalOrLessThan)
{
return compareTo(equalOrLessThan) <= 0;
}
- boolean isAtLeast(LocalStatus equalOrGreaterThan)
+ boolean isAtLeast(CoordinateStatus equalOrGreaterThan)
{
return compareTo(equalOrGreaterThan) >= 0;
}
}
- enum GlobalStatus
- {
- NotExecuted, Disseminating, PendingDurable, Durable, Done; //
TODO: manage propagating from Durable to everyone
- boolean isAtLeast(GlobalStatus equalOrGreaterThan)
- {
- return compareTo(equalOrGreaterThan) >= 0;
- }
- }
-
- LocalStatus local = LocalStatus.NotWitnessed;
- Progress localProgress = NoneExpected;
- Status maxStatus;
- Ballot maxPromised;
- boolean maxPromiseHasBeenAccepted;
-
- GlobalStatus global = NotExecuted;
- Progress globalProgress = NoneExpected;
- Set<Id> globalNotPersisted;
- GlobalPendingDurable globalPendingDurable;
+ CoordinateStatus status = NotWitnessed;
+ Progress progress = NoneExpected;
+ ProgressToken token = ProgressToken.NONE;
Object debugInvestigating;
- void ensureAtLeast(Command command, LocalStatus newStatus, Progress
newProgress, Node node)
+ void ensureAtLeast(Command command, CoordinateStatus newStatus,
Progress newProgress)
{
- if (newStatus == Committed && global.isAtLeast(Durable) &&
!command.executes())
- {
- local = LocalStatus.Done;
- localProgress = Done;
- }
- else if (newStatus.compareTo(local) > 0)
- {
- local = newStatus;
- localProgress = newProgress;
- }
- refreshGlobal(node, command, null, null);
+ ensureAtLeast(command.txnId(), command.homeKey(), newStatus,
newProgress);
updateMax(command);
}
- void updateMax(Command command)
- {
- if (maxStatus == null || maxStatus.compareTo(command.status()) < 0)
- maxStatus = command.status();
- if (maxPromised == null ||
maxPromised.compareTo(command.promised()) < 0)
- maxPromised = command.promised();
- maxPromiseHasBeenAccepted |=
command.accepted().equals(maxPromised);
- }
-
- void updateMax(CheckStatusOk ok)
+ void ensureAtLeast(TxnId txnId, RoutingKey homeKey, CoordinateStatus
newStatus, Progress newProgress)
{
- // TODO: perhaps set localProgress back to Waiting if
Investigating and we update anything?
- if (ok.status.compareTo(maxStatus) > 0) maxStatus = ok.status;
- if (ok.promised.compareTo(maxPromised) > 0)
- {
- maxPromised = ok.promised;
- maxPromiseHasBeenAccepted = ok.accepted.equals(ok.promised);
- }
- else if (ok.promised.equals(maxPromised))
+ if (newStatus.compareTo(status) > 0)
{
- maxPromiseHasBeenAccepted |= ok.accepted.equals(ok.promised);
+ status = newStatus;
+ progress = newProgress;
}
}
- private boolean refreshGlobal(@Nullable Node node, @Nullable Command
command, @Nullable Id persistedOn, @Nullable Set<Id> persistedOns)
+ void updateMax(Command command)
{
- if (global == NotExecuted)
- return false;
-
- if (globalPendingDurable != null)
- {
- if (node == null || command == null ||
command.is(Status.NotWitnessed))
- return false;
-
- if (persistedOns == null) persistedOns =
globalPendingDurable.persistedOn;
- else persistedOns.addAll(globalPendingDurable.persistedOn);
-
- global = Durable;
- globalProgress = Expected;
- }
-
- if (globalNotPersisted == null)
- {
- assert node != null && command != null;
- if (!node.topology().hasEpoch(command.executeAt().epoch))
- return false;
-
- globalNotPersisted = new
HashSet<>(node.topology().preciseEpochs(command.txn(),
command.executeAt().epoch).nodes());
- if (local == LocalStatus.Done)
- globalNotPersisted.remove(node.id());
- }
- if (globalNotPersisted != null)
- {
- if (persistedOn != null)
- globalNotPersisted.remove(persistedOn);
- if (persistedOns != null)
- globalNotPersisted.removeAll(persistedOns);
-
- if (globalNotPersisted.isEmpty())
- {
- global = GlobalStatus.Done;
- globalProgress = Done;
- }
- }
-
- return true;
+ token = token.merge(new ProgressToken(command.durability(),
command.status(), command.promised(), command.accepted()));
}
- void executedOnAllShards(Node node, Command command, Set<Id>
persistedOn)
+ void updateMax(ProgressToken ok)
{
- if (local == LocalStatus.NotWitnessed)
- {
- global = PendingDurable;
- globalProgress = NoneExpected;
- globalPendingDurable = new GlobalPendingDurable(persistedOn);
- }
- else if (global != GlobalStatus.Done)
- {
- global = Durable;
- globalProgress = Expected;
- refreshGlobal(node, command, null, persistedOn);
- if (local.isAtLeast(Committed) && !command.executes())
- {
- local = LocalStatus.Done;
- localProgress = Done;
- }
- }
+ // TODO: perhaps set localProgress back to Waiting if
Investigating and we update anything?
+ token = token.merge(ok);
}
- void executed(Node node, Command command)
+ void durableGlobal()
{
- switch (local)
+ switch (status)
{
default: throw new IllegalStateException();
case NotWitnessed:
case Uncommitted:
case Committed:
case ReadyToExecute:
- local = LocalStatus.Done;
- localProgress = NoneExpected;
- if (global == NotExecuted)
- {
- global = Disseminating;
- globalProgress = Expected;
- }
- refreshGlobal(node, command, node.id(), null);
+ status = CoordinateStatus.Done;
+ progress = NoneExpected;
case Done:
}
}
- void updateLocal(Node node, TxnId txnId, Command command)
+ void update(Node node, CommandStore commandStore, TxnId txnId, Command
command)
{
- if (localProgress != NoProgress)
+ if (progress != NoProgress)
{
- localProgress = advance(localProgress);
+ progress = advance(progress);
return;
}
- localProgress = Investigating;
- switch (local)
+ progress = Investigating;
+ switch (status)
{
default: throw new IllegalStateException();
case NotWitnessed:
case Committed:
+ case Done:
throw new IllegalStateException(); // NoProgressExpected
case Uncommitted:
case ReadyToExecute:
{
- if (local.isAtLeast(Committed) &&
global.isAtLeast(PendingDurable))
+ if (status.isAtLeast(CoordinateStatus.Committed) &&
command.durability().isDurable())
{
// must also be committed, as at the time of writing
we do not guarantee dissemination of Commit
// records to the home shard, so we only know the
executeAt shards will have witnessed this
// if the home shard is at an earlier phase, it must
run recovery
- Key homeKey = command.homeKey();
- long homeEpoch = command.executeAt().epoch;
-
- node.withEpoch(homeEpoch, () -> {
- Shard homeShard =
node.topology().forEpoch(homeKey, homeEpoch);
- debugInvestigating = checkOnCommitted(node, txnId,
homeKey, homeShard, command.executeAt().epoch)
- .addCallback((success, fail)
-> {
- // should have found
enough information to apply the result, but in case we did not reset progress
- if (localProgress ==
Investigating)
- localProgress =
Expected;
- });
- });
+ long epoch = command.executeAt().epoch;
+ node.withEpoch(epoch, () -> FetchData.fetch(Apply,
node, txnId, command.route(), epoch, (success, fail) -> {
+ // should have found enough information to apply
the result, but in case we did not reset progress
+ if (progress == Investigating)
+ progress = Expected;
+ }));
}
else
{
- Key homeKey = command.homeKey();
- long homeEpoch = (local.isAtMost(Uncommitted) ? txnId
: command.executeAt()).epoch;
-
- node.withEpoch(homeEpoch, () -> {
- Shard homeShard =
node.topology().forEpoch(homeKey, homeEpoch);
+ RoutingKey homeKey = command.homeKey();
+ node.withEpoch(txnId.epoch, () -> {
- Future<CheckStatusOk> recover =
node.maybeRecover(txnId, command.txn(),
-
homeKey, homeShard, homeEpoch,
-
maxStatus, maxPromised, maxPromiseHasBeenAccepted);
+ Future<? extends Outcome> recover =
node.maybeRecover(txnId, homeKey, command.route(), token);
recover.addCallback((success, fail) -> {
- if (local.isAtMost(ReadyToExecute) &&
localProgress == Investigating)
+ if (status.isAtMost(ReadyToExecute) &&
progress == Investigating)
{
- localProgress = Expected;
+ progress = Expected;
if (fail != null)
return;
- if (success == null ||
success.hasExecutedOnAllShards)
- executedOnAllShards(node, command,
null);
- else
- updateMax(success);
+ ProgressToken token =
success.asProgressToken();
+ // TODO: avoid returning null (need to
change semantics here in this case, though, as Recover doesn't return
CheckStatusOk)
+ if (token.durability.isDurable())
+ {
+
commandStore.execute(contextFor(txnId), safeStore -> {
+ Command cmd =
safeStore.command(txnId);
+ cmd.setDurability(safeStore,
token.durability, homeKey, null);
+
safeStore.progressLog().durable(txnId, cmd.someRoutingKeys(), null);
+ }).addCallback(commandStore.agent());
+ }
+
+ updateMax(token);
}
});
debugInvestigating = recover;
});
}
}
- case Done:
}
}
- void updateGlobal(Node node, TxnId txnId, Command command)
+ @Override
+ public String toString()
{
- if (!refreshGlobal(node, command, null, null))
- return;
+ return "{" + status + ',' + progress + '}';
+ }
+ }
- if (global != Disseminating)
+ // exists only on home shard
+ static class DisseminateState
+ {
+ enum DisseminateStatus { NotExecuted, Durable, Done }
+
+ // TODO: thread safety (schedule on progress log executor)
+ class CoordinateAwareness implements Callback<SimpleReply>
+ {
+ @Override
+ public void onSuccess(Id from, SimpleReply reply)
+ {
+ notAwareOfDurability.remove(from);
+ maybeDone();
+ }
+
+ @Override
+ public void onFailure(Id from, Throwable failure)
+ {
+ }
+
+ @Override
+ public void onCallbackFailure(Id from, Throwable failure)
+ {
+ }
+ }
+
+ DisseminateStatus status = NotExecuted;
+ Progress progress = NoneExpected;
+ Set<Id> notAwareOfDurability;
+ Set<Id> notPersisted;
+
+ List<Runnable> whenReady;
+
+ CoordinateAwareness investigating;
+
+ private void whenReady(Node node, Command command, Runnable runnable)
+ {
+ if (notAwareOfDurability != null || maybeReady(node, command))
+ {
+ runnable.run();
+ }
+ else
+ {
+ if (whenReady == null)
+ whenReady = new ArrayList<>();
+ whenReady.add(runnable);
+ }
+ }
+
+ private void whenReady(Runnable runnable)
+ {
+ if (notAwareOfDurability != null)
+ {
+ runnable.run();
+ }
+ else
+ {
+ if (whenReady == null)
+ whenReady = new ArrayList<>();
+ whenReady.add(runnable);
+ }
+ }
+
+ // must know the epoch information, and have a valid Route
+ private boolean maybeReady(Node node, Command command)
Review Comment:
It does have a side-effect, it invokes `whenReady`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]