belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1011788405
##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -438,77 +438,94 @@ void update(Node node, TxnId txnId, Command command)
return;
}
+ if (command.hasBeen(blockedUntil))
+ {
+ if (command.hasBeen(ReplicationPhase.Apply)) recordApply();
+ else if (command.hasBeen(ReplicationPhase.Commit))
recordCommit();
+ return;
+ }
+
progress = Investigating;
- // check status with the only keys we know, if any, then:
- // 1. if we cannot find any primary record of the transaction,
then it cannot be a dependency so record this fact
- // 2. otherwise record the homeKey for future reference and set
the status based on whether progress has been made
- long onEpoch = (command.hasBeen(Status.Committed) ?
command.executeAt() : txnId).epoch;
- node.withEpoch(onEpoch, () -> {
- Key someKey; Keys someKeys; {
- Keys tmpKeys = Keys.union(this.someKeys,
command.someKeys());
- someKey = command.homeKey() == null ? tmpKeys.get(0) :
command.homeKey();
- someKeys = tmpKeys.with(someKey);
+ // first make sure we have enough information to obtain the
command locally
+ boolean canExecute = command.hasBeen(Status.Committed);
+ long srcEpoch = (command.hasBeen(Status.Committed) ?
command.executeAt() : txnId).epoch;
+ // TODO: compute fromEpoch, the epoch we already have this txn
replicated until
+ long toEpoch = Math.max(srcEpoch, node.topology().epoch());
+ node.withEpoch(srcEpoch, () -> {
+
+ // first check we have enough routing information for the
ranges we own; if not, fetch it
+ AbstractRoute route = route(command);
+ KeyRanges ranges =
node.topology().localRangesForEpochs(txnId.epoch, toEpoch);
+ if (route == null || !route.covers(ranges))
+ {
+ ReplicationPhase blockedOn = this.blockedUntil;
+ BiConsumer<FindRoute.Result, Throwable> foundRoute =
(findRoute, fail) -> {
+ if (progress == Investigating && blockedOn ==
this.blockedUntil)
+ {
+ progress = Expected;
+ if (findRoute != null && findRoute.route != null
&& !(blockedOnKeys instanceof Route))
+ blockedOnKeys = findRoute.route;
+ if (findRoute == null && fail == null)
+ invalidate(node, command);
+ }
+ };
+
+ if (command.homeKey() != null || route != null)
+ {
+ RoutingKey homeKey = route != null ? route.homeKey :
command.homeKey();
+ debugInvestigating = FindRoute.findRoute(node, txnId,
homeKey, foundRoute);
+ }
+ else
+ {
+ RoutingKeys someKeys = this.blockedOnKeys;
+ if (someKeys == null || someKeys.isEmpty()) someKeys =
route;
+ debugInvestigating = FindHomeKey.findHomeKey(node,
txnId, someKeys, (homeKey, fail) -> {
+ if (progress == Investigating && blockedOn ==
this.blockedUntil)
+ {
+ if (fail != null) progress = Expected;
+ else if (homeKey != null)
FindRoute.findRoute(node, txnId, homeKey, foundRoute);
+ else invalidate(node, command);
+ }
+ });
+ }
+ return;
}
- Shard someShard = node.topology().forEpoch(someKey, onEpoch);
- CheckOnCommitted check = blockedOn == Executed ?
checkOnCommitted(node, txnId, someKey, someShard, onEpoch)
- :
checkOnUncommitted(node, txnId, someKeys, someKey, someShard, onEpoch);
- debugInvestigating = check;
- check.addCallback((success, fail) -> {
+ // check status with the only keys we know, if any, then:
+ // 1. if we cannot find any primary record of the transaction,
then it cannot be a dependency so record this fact
+ // 2. otherwise record the homeKey for future reference and
set the status based on whether progress has been made
+ BiConsumer<ReplicationPhase, Throwable> callback = (success,
fail) -> {
if (progress != Investigating)
return;
progress = Expected;
- if (fail != null)
- return;
+ if (fail == null && success.compareTo(blockedUntil) >= 0)
+ progress = success.compareTo(Apply) >= 0 ? Done :
NoneExpected;
+ };
- switch (success.status)
- {
- default: throw new IllegalStateException();
- case AcceptedInvalidate:
- // we may or may not know the homeShard at this
point; if the response doesn't know
- // then assume we potentially need to pick up the
invalidation
- if (success.homeKey != null)
- break;
- // TODO: probably don't want to immediately go to
Invalidate,
- // instead first give in-flight one a chance
to complete
- case NotWitnessed:
- progress = Investigating;
- // TODO: this should instead invalidate the
transaction on this shard, which invalidates it for all shards,
- // but we need to first support invalidation
- debugInvestigating = Invalidate.invalidate(node,
txnId, someKeys, someKey)
- .addCallback((success2, fail2) -> {
- if (progress != Investigating)
return;
- if (fail2 != null) progress =
Expected;
- else switch (success2)
- {
- default: throw new
IllegalStateException();
- case PREEMPTED:
- progress = Expected;
- break;
- case EXECUTED:
- case INVALIDATED:
- progress = Done;
- }
- });
- break;
- case PreAccepted:
- case Accepted:
- // either it's the home shard and it's managing
progress,
- // or we now know the home shard and will contact
it next time
- break;
- case Committed:
- case ReadyToExecute:
-
Preconditions.checkState(command.hasBeen(Status.Committed) ||
!command.commandStore().ranges().intersects(txnId.epoch, someKeys));
- if (blockedOn == Status.Committed)
- progress = NoneExpected;
- break;
- case Executed:
- case Applied:
- case Invalidated:
- progress = Done;
- }
- });
+ debugInvestigating = FetchData.fetch(blockedUntil, node,
txnId, route, toEpoch, callback);
Review Comment:
It's useful for debugging purposes, and since this implementation is not
used outside of testing, it seems useful to maintain indefinitely.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]