belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1011827508
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -592,75 +839,320 @@ public Key someKey()
* Preferentially, this is homeKey on nodes that replicate it, and
otherwise any key that is replicated, as of txnId.epoch
*/
- public final void progressKey(Key progressKey)
+ public final void progressKey(RoutingKey progressKey)
{
- Key current = progressKey();
+ RoutingKey current = progressKey();
if (current == null) setProgressKey(progressKey);
else if (!current.equals(progressKey)) throw new AssertionError();
}
- // does this specific Command instance execute (i.e. does it lose
ownership post Commit)
- public boolean executes()
+ private ProgressShard progressShard(SafeCommandStore safeStore,
AbstractRoute route, KeyRanges coordinateRanges)
{
- KeyRanges ranges = commandStore().ranges().at(executeAt().epoch);
- return ranges != null && txn().keys().any(ranges,
commandStore()::hashIntersects);
+ if (progressKey() == null)
+ return Unsure;
+
+ return progressShard(safeStore, route, progressKey(),
coordinateRanges);
}
- public final void txn(Txn txn)
+ private ProgressShard progressShard(SafeCommandStore safeStore)
{
- Txn current = txn();
- if (current == null) setTxn(txn);
- else if (!current.equals(txn)) throw new AssertionError();
+ RoutingKey progressKey = progressKey();
+ if (progressKey == null)
+ return Unsure;
+
+ if (progressKey == NO_PROGRESS_KEY)
+ return No;
+
+ KeyRanges coordinateRanges = safeStore.ranges().at(txnId().epoch);
+ if (!coordinateRanges.contains(progressKey))
+ return No;
+
+ if (!safeStore.commandStore().hashIntersects(progressKey))
+ return No;
+
+ return progressKey.equals(homeKey()) ? Home : Local;
}
- public boolean handles(long epoch, Key someKey)
+ private KeyRanges coordinateRanges(SafeCommandStore safeStore)
{
- if (!commandStore().hashIntersects(someKey))
- return false;
+ return safeStore.ranges().at(txnId().epoch);
+ }
- KeyRanges ranges = commandStore().ranges().at(epoch);
- if (ranges == null)
- return false;
- return ranges.contains(someKey);
+ private KeyRanges executeRanges(SafeCommandStore safeStore, Timestamp
executeAt)
+ {
+ return safeStore.ranges().since(executeAt.epoch);
}
- private Id coordinator()
+ enum EnsureAction { Ignore, Check, Add, TrySet, Set }
+
+ /**
+ * Validate we have sufficient information for the route, partialTxn and
partialDeps fields, and if so update them;
+ * otherwise return false (or throw an exception if an illegal state is
encountered)
+ */
+ private boolean validate(KeyRanges existingRanges, KeyRanges
additionalRanges, ProgressShard shard,
+ AbstractRoute route, EnsureAction ensureRoute,
+ @Nullable PartialTxn partialTxn, EnsureAction
ensurePartialTxn,
+ @Nullable PartialDeps partialDeps, EnsureAction
ensurePartialDeps)
{
- if (promised().equals(Ballot.ZERO))
- return txnId().node;
- return promised().node;
+ if (shard == Unsure)
+ return false;
+
+ // first validate route
+ if (shard.isProgress())
+ {
+ // validate route
+ if (shard.isHome())
+ {
+ switch (ensureRoute)
+ {
+ default: throw new AssertionError();
+ case Check:
+ if (!(route() instanceof Route) && !(route instanceof
Route))
+ return false;
+ case Ignore:
+ break;
+ case Add:
+ case Set:
+ if (!(route instanceof Route))
+ throw new IllegalArgumentException("Incomplete
route (" + route + ") sent to home shard");
+ break;
+ case TrySet:
+ if (!(route instanceof Route))
+ return false;
+ }
+ }
+ else if (route() == null)
+ {
+ // failing any of these tests is always an illegal state
+ if (!route.covers(existingRanges))
+ return false;
+
+ if (existingRanges != additionalRanges &&
!route.covers(additionalRanges))
+ throw new IllegalArgumentException("Incomplete route (" +
route + ") provided; does not cover " + additionalRanges);
+ }
+ else if (existingRanges != additionalRanges &&
!route().covers(additionalRanges))
+ {
+ if (!route.covers(additionalRanges))
+ throw new IllegalArgumentException("Incomplete route (" +
route + ") provided; does not cover " + additionalRanges);
+ }
+ else
+ {
+ if (!route().covers(existingRanges))
+ throw new IllegalStateException();
+ }
+ }
+
+ // invalid to Add deps to Accepted or AcceptedInvalidate statuses, as
Committed deps are not equivalent
+ // and we may erroneously believe we have covered a wider range than
we have infact covered
+ if (ensurePartialDeps == Add)
+ Preconditions.checkState(status() != Accepted && status() !=
AcceptedInvalidate);
+
+ // validate new partial txn
+ if (!validate(ensurePartialTxn, existingRanges, additionalRanges,
covers(partialTxn()), covers(partialTxn), "txn", partialTxn))
+ return false;
+
+ if (shard.isHome() && ensurePartialTxn != Ignore)
+ {
+ if (!hasQuery(partialTxn()) && !hasQuery(partialTxn))
+ throw new IllegalStateException();
+ }
+
+ return validate(ensurePartialDeps, existingRanges, additionalRanges,
covers(partialDeps()), covers(partialDeps), "deps", partialDeps);
}
- private PartialCommand directlyBlockedBy()
+ private void set(SafeCommandStore safeStore,
+ KeyRanges existingRanges, KeyRanges additionalRanges,
ProgressShard shard, AbstractRoute route,
+ @Nullable PartialTxn partialTxn, EnsureAction
ensurePartialTxn,
+ @Nullable PartialDeps partialDeps, EnsureAction
ensurePartialDeps)
{
- // firstly we're waiting on every dep to commit
- while (isWaitingOnCommit())
+ Preconditions.checkState(progressKey() != null);
+ KeyRanges allRanges = existingRanges.union(additionalRanges);
+
+ if (shard.isProgress()) setRoute(AbstractRoute.merge(route(), route));
+ else setRoute(AbstractRoute.merge(route(), route.slice(allRanges)));
+
+ // TODO (soon): stop round-robin hashing; partition only on ranges
+ switch (ensurePartialTxn)
{
- // TODO: when we change our liveness mechanism this may not be a
problem
- // cannot guarantee that listener updating this set is invoked
before this method by another listener
- // so we must check the entry is still valid, and potentially
remove it if not
- PartialCommand waitingOn = firstWaitingOnCommit();
- if (!waitingOn.hasBeen(Committed)) return waitingOn;
- onChangeInternal(waitingOn);
+ case Add:
+ if (partialTxn == null)
+ break;
+
+ if (partialTxn() != null)
+ {
+ partialTxn = partialTxn.slice(allRanges, shard.isHome());
+ partialTxn.keys().foldlDifference(partialTxn().keys(), (i,
key, p, v) -> {
+ if (safeStore.commandStore().hashIntersects(key))
+ safeStore.commandsForKey(key).register(this);
+ return v;
+ }, 0, 0, 1);
+ this.setPartialTxn(partialTxn().with(partialTxn));
Review Comment:
I think this will all likely be deprecated by the work Blake is doing, so I
wouldn't agonise over it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]