belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1011791935
##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -668,242 +710,123 @@ State recordExecute(TxnId txnId)
{
State state = stateMap.get(txnId);
if (state != null && state.blockingState != null)
- state.blockingState.recordExecute();
+ state.blockingState.recordApply();
return state;
}
- private void ensureSafeOrAtLeast(Command command, boolean
isProgressShard, boolean isHomeShard, LocalStatus newStatus, Progress
newProgress)
+ private void ensureSafeOrAtLeast(Command command, ProgressShard shard,
CoordinateStatus newStatus, Progress newProgress)
{
+ Preconditions.checkState(shard != Unsure);
+
State state = null;
assert newStatus.isAtMost(ReadyToExecute);
- if (newStatus.isAtLeast(LocalStatus.Committed))
+ if (newStatus.isAtLeast(CoordinateStatus.Committed))
state = recordCommit(command.txnId());
- if (isProgressShard)
+ if (shard.isProgress())
{
state = ensure(command.txnId(), state);
- if (isHomeShard) state.ensureAtLeast(command, newStatus,
newProgress, node);
+ if (shard.isHome()) state.ensureAtLeast(command, newStatus,
newProgress);
else ensure(command.txnId()).ensureAtLeast(Safe);
}
}
@Override
- public void accept(Command command, boolean isProgressShard, boolean
isHomeShard)
+ public void accept(Command command, ProgressShard shard)
{
- ensureSafeOrAtLeast(command, isProgressShard, isHomeShard,
Uncommitted, Expected);
+ ensureSafeOrAtLeast(command, shard, Uncommitted, Expected);
}
@Override
- public void commit(Command command, boolean isProgressShard, boolean
isHomeShard)
+ public void commit(Command command, ProgressShard shard)
{
- ensureSafeOrAtLeast(command, isProgressShard, isHomeShard,
LocalStatus.Committed, NoneExpected);
+ ensureSafeOrAtLeast(command, shard, CoordinateStatus.Committed,
NoneExpected);
}
@Override
- public void readyToExecute(Command command, boolean isProgressShard,
boolean isHomeShard)
+ public void readyToExecute(Command command, ProgressShard shard)
{
- ensureSafeOrAtLeast(command, isProgressShard, isHomeShard,
LocalStatus.ReadyToExecute, Expected);
+ ensureSafeOrAtLeast(command, shard,
CoordinateStatus.ReadyToExecute, Expected);
}
@Override
- public void execute(Command command, boolean isProgressShard, boolean
isHomeShard)
+ public void execute(Command command, ProgressShard shard)
{
- State state = recordExecute(command.txnId());
-
- if (isProgressShard)
- {
- state = ensure(command.txnId(), state);
-
- if (isHomeShard) state.home().executed(node, command);
- else ensure(command.txnId()).ensureAtLeast(Safe);
- }
+ recordExecute(command.txnId());
+ // this is the home shard's state ONLY, so we don't know it is
fully durable locally
+ ensureSafeOrAtLeast(command, shard,
CoordinateStatus.ReadyToExecute, Expected);
}
@Override
- public void invalidate(Command command, boolean isProgressShard,
boolean isHomeShard)
+ public void invalidate(Command command, ProgressShard shard)
{
State state = recordExecute(command.txnId());
- if (isProgressShard)
+ Preconditions.checkState(shard == Home || state == null ||
state.coordinateState == null);
+
+ // note: we permit Unsure here, so we check if we have any local
home state
+ if (shard.isProgress())
{
state = ensure(command.txnId(), state);
- if (isHomeShard) state.ensureAtLeast(command,
LocalStatus.Done, Done, node);
+ if (shard.isHome()) state.ensureAtLeast(command,
CoordinateStatus.Done, Done);
else ensure(command.txnId()).ensureAtLeast(Safe);
}
}
@Override
- public void executedOnAllShards(Command command, Set<Id> persistedOn)
- {
- State state = ensure(command.txnId());
- state.home().executedOnAllShards(node, command, persistedOn);
- }
-
- @Override
- public void waiting(PartialCommand blockedByCommand, Keys someKeys)
- {
- TxnId blockedBy = blockedByCommand.txnId();
- if (!blockedByCommand.hasBeen(Executed))
- ensure(blockedBy).recordBlocking(blockedByCommand, someKeys);
- }
- }
-
- @Override
- public void run()
- {
- for (Instance instance : instances)
- {
- // TODO: we want to be able to poll others about pending
dependencies to check forward progress,
- // as we don't know all dependencies locally (or perhaps
any, at execution time) so we may
- // begin expecting forward progress too early
- // state map may be updated during iteration, so need to clone the
values set
- new ArrayList<>(instance.stateMap.values()).forEach(state ->
state.update(node));
- }
- }
-
- static class CoordinateApplyAndCheck extends AsyncFuture<Void> implements
Callback<ApplyAndCheckOk>
- {
- final TxnId txnId;
- final HomeState state;
- final Set<Id> waitingOnResponses;
-
- static Future<Void> applyAndCheck(Node node, TxnId txnId, Command
command, HomeState state)
- {
- CoordinateApplyAndCheck coordinate = new
CoordinateApplyAndCheck(txnId, state);
- Topologies topologies =
node.topology().preciseEpochs(command.txn(), command.executeAt().epoch);
- state.globalNotPersisted.retainAll(topologies.nodes()); // we
might have had some nodes from older shards that are now redundant
- node.send(state.globalNotPersisted, id -> new ApplyAndCheck(id,
topologies,
-
command.txnId(), command.txn(), command.homeKey(),
-
command.savedDeps(), command.executeAt(),
-
command.writes(), command.result(),
-
state.globalNotPersisted),
- coordinate);
- return coordinate;
- }
-
- CoordinateApplyAndCheck(TxnId txnId, HomeState state)
+ public void durableLocal(TxnId txnId)
{
- this.txnId = txnId;
- this.state = state;
- this.waitingOnResponses = new HashSet<>(state.globalNotPersisted);
+ State state = ensure(txnId);
+ state.global().durableLocal(node);
}
@Override
- public void onSuccess(Id from, ApplyAndCheckOk response)
+ public void durable(Command command, @Nullable Set<Id> persistedOn)
{
- state.globalNotPersisted.retainAll(response.notPersisted);
- state.refreshGlobal(null, null, null, null);
+ State state = ensure(command.txnId());
+ if (!command.status().hasBeen(PreApplied))
+ state.recordBlocking(command.txnId(), Apply,
command.someRoutingKeys());
+ state.local().durableGlobal();
+ state.global().durableGlobal(node, command, persistedOn);
}
@Override
- public void onFailure(Id from, Throwable failure)
+ public void durable(TxnId txnId, RoutingKeys someKeys, ProgressShard
shard)
Review Comment:
We've been informed that the transaction is durable, i.e. a majority of
replicas at all shards have recorded its outcome. So we can (and should) fetch
the outcome locally - hence marking blocking, which is all that really does.
Logically, our local state being up-to-date is now "blocked" on this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]