belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093757114
##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
{
default: throw new IllegalStateException();
case Invalidated:
+ {
commitInvalidate();
return;
+ }
case Applied:
case PreApplied:
- // TODO (desired, efficiency): in some cases we can use
the deps we already have (e.g. if we have a quorum of Committed responses)
- node.withEpoch(executeAt.epoch(), () -> {
- CollectDeps.withDeps(node, txnId, route, txn,
acceptOrCommit.executeAt, (deps, fail) -> {
- if (fail != null)
- {
- accept(null, fail);
- }
- else
- {
- // TODO (required, consider): when
writes/result are partially replicated, need to confirm we have quorum of these
- Persist.persistAndCommit(node, txnId, route,
txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
- accept(acceptOrCommit.result, null);
- }
- });
- });
+ {
+ // must have gone through Accepted, so we must have
witnessed >= Accepted for each shard
+ Deps acceptedDeps = tryMergeAcceptedDeps();
+ Invariants.checkState(acceptedDeps != null);
+ // TODO (required, consider): when writes/result are
partially replicated, need to confirm we have quorum of these
+ Persist.persistAndCommit(node, txnId, route, txn,
executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+ accept(acceptOrCommit.result, null);
return;
+ }
case ReadyToExecute:
case PreCommitted:
case Committed:
+ {
+ Deps acceptedDeps = tryMergeAcceptedDeps(); // must have
gone through Accepted, so we must have witnessed >= Accepted for each shard
+ Invariants.checkState(acceptedDeps != null);
// TODO (desired, efficiency): in some cases we can use
the deps we already have (e.g. if we have a quorum of Committed responses)
- node.withEpoch(executeAt.epoch(), () -> {
- CollectDeps.withDeps(node, txnId, route, txn,
executeAt, (deps, fail) -> {
- if (fail != null) accept(null, fail);
- else Execute.execute(node, txnId, txn, route,
acceptOrCommit.executeAt, deps, this);
- });
- });
+ Execute.execute(node, txnId, txn, route,
acceptOrCommit.executeAt, acceptedDeps, this);
return;
+ }
case Accepted:
- // no need to preaccept the later round, as future
operations always include every old epoch (until it is fully migrated)
- propose(acceptOrCommit.executeAt, mergeDeps());
- return;
+ {
+ Deps deps;
+ if (txnId.rw().proposesDeps()) deps =
tryMergeAcceptedDeps();
+ else deps = mergeDeps();
+
+ if (deps != null)
+ {
+ // TODO (desired, behaviour): if we didn't find
Accepted in *every* shard, consider invalidating for consistency of behaviour
+ propose(acceptOrCommit.executeAt, deps);
+ return;
+ }
+
+ // if we propose deps, and cannot assemble a complete set,
then we never reached consensus and can be invalidated
Review Comment:
Send `Accept` with a specific set of `Deps` we want to be durably
recoverable before we use them. So for these transactions we're really agreeing
`Deps` rather than `executeAt` (since they always use `txnId` for that, though
this is arbitrary)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]