aweisberg commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1014414391
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -19,362 +19,487 @@
package accord.local;
import accord.api.*;
-import accord.local.Node.Id;
+import accord.local.Status.Durability;
+import accord.local.Status.ExecutionPhase;
+import accord.local.Status.ReplicationPhase;
import accord.primitives.*;
-import accord.txn.Txn;
-import accord.txn.Writes;
+import accord.primitives.Writes;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
-import static accord.local.Status.*;
+import static accord.local.Status.ExecutionPhase.Decided;
+import static accord.local.Status.ExecutionPhase.Done;
import static accord.utils.Utils.listOf;
-public abstract class Command implements Listener, Consumer<Listener>,
PreLoadContext, PartialCommand.WithDeps
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.api.ProgressLog.ProgressShard;
+import accord.primitives.AbstractRoute;
+import accord.primitives.KeyRanges;
+import accord.primitives.Ballot;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Route;
+import accord.primitives.RoutingKeys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+import static accord.api.ProgressLog.ProgressShard.Home;
+import static accord.api.ProgressLog.ProgressShard.Local;
+import static accord.api.ProgressLog.ProgressShard.No;
+import static accord.api.ProgressLog.ProgressShard.Unsure;
+import static accord.local.Command.EnsureAction.Add;
+import static accord.local.Command.EnsureAction.Check;
+import static accord.local.Command.EnsureAction.Ignore;
+import static accord.local.Command.EnsureAction.Set;
+import static accord.local.Command.EnsureAction.TrySet;
+import static accord.local.Status.Accepted;
+import static accord.local.Status.AcceptedInvalidate;
+import static accord.local.Status.Applied;
+import static accord.local.Status.Committed;
+import static accord.local.Status.PreApplied;
+import static accord.local.Status.Invalidated;
+import static accord.local.Status.NotWitnessed;
+import static accord.local.Status.PreAccepted;
+import static accord.local.Status.ReadyToExecute;
+
+public abstract class Command implements Listener,
BiConsumer<SafeCommandStore, Listener>, PreLoadContext
{
private static final Logger logger =
LoggerFactory.getLogger(Command.class);
- public abstract CommandStore commandStore();
+ public abstract TxnId txnId();
- public abstract Key homeKey();
+ // TODO (now): pack this into TxnId
+ public abstract Txn.Kind kind();
- protected abstract void setHomeKey(Key key);
-
- public abstract Key progressKey();
+ public boolean hasBeen(Status status)
+ {
+ return status().hasBeen(status);
+ }
- protected abstract void setProgressKey(Key key);
+ public boolean hasBeen(ReplicationPhase phase)
+ {
+ return status().replicates.compareTo(phase) >= 0;
+ }
- protected abstract void setTxn(Txn txn);
+ public boolean hasBeen(ExecutionPhase phase)
+ {
+ return status().execution.compareTo(phase) >= 0;
+ }
- public abstract Ballot promised();
+ public boolean is(Status status)
+ {
+ return status() == status;
+ }
- public abstract void promised(Ballot ballot);
+ /**
+ * homeKey is a global value that defines the home shard - the one tasked
with ensuring the transaction is finished.
+ * progressKey is a local value that defines the local shard responsible
for ensuring progress on the transaction.
+ * This will be homeKey if it is owned by the node, and some other key
otherwise. If not the home shard, the progress
+ * shard has much weaker responsibilities, only ensuring that the home
shard has durably witnessed the txnId.
+ */
+ public abstract RoutingKey homeKey();
+ protected abstract void setHomeKey(RoutingKey key);
- public abstract Ballot accepted();
+ public abstract RoutingKey progressKey();
+ protected abstract void setProgressKey(RoutingKey key);
- public abstract void accepted(Ballot ballot);
+ /**
+ * If this is the home shard, we require that this is a Route for all
states > NotWitnessed;
+ * otherwise for the local progress shard this is ordinarily a
PartialRoute, and for other shards this is not set,
+ * so that there is only one copy per node that can be consulted to
construct the full set of involved keys.
+ *
+ * If hasBeen(Committed) this must contain the keys for both txnId.epoch
and executeAt.epoch
+ *
+ * TODO: maybe set this for all local shards, but slice to only those
participating keys
+ * (would probably need to remove hashIntersects)
+ */
+ public abstract AbstractRoute route();
+ protected abstract void setRoute(AbstractRoute route);
- public abstract void executeAt(Timestamp timestamp);
+ public abstract PartialTxn partialTxn();
+ protected abstract void setPartialTxn(PartialTxn txn);
- public abstract Deps savedDeps();
+ public abstract Ballot promised();
+ protected abstract void setPromised(Ballot ballot);
- public abstract void savedDeps(Deps deps);
+ public abstract Ballot accepted();
+ protected abstract void setAccepted(Ballot ballot);
- public abstract Writes writes();
+ public void saveRoute(SafeCommandStore safeStore, Route route)
+ {
+ setRoute(route);
+ updateHomeKey(safeStore, route.homeKey);
+ }
- public abstract void writes(Writes writes);
+ public abstract Timestamp executeAt();
+ protected abstract void setExecuteAt(Timestamp timestamp);
- public abstract Result result();
+ /**
+ * While !hasBeen(Committed), used only as a register for Accept state,
used by Recovery
+ * If hasBeen(Committed), represents the full deps owned by this range for
execution at both txnId.epoch
+ * AND executeAt.epoch so that it may be used for Recovery (which contacts
only txnId.epoch topology),
+ * but also for execution.
+ */
+ public abstract PartialDeps partialDeps();
+ protected abstract void setPartialDeps(PartialDeps deps);
- public abstract void result(Result result);
+ public abstract Writes writes();
+ protected abstract void setWrites(Writes writes);
- public abstract void status(Status status);
+ public abstract Result result();
+ protected abstract void setResult(Result result);
- public abstract boolean isGloballyPersistent();
+ public abstract Status status();
+ protected abstract void setStatus(Status status);
- public abstract void isGloballyPersistent(boolean v);
+ public abstract Durability durability();
+ public abstract void setDurability(Durability v);
public abstract Command addListener(Listener listener);
+ public abstract void removeListener(Listener listener);
+ protected abstract void notifyListeners(SafeCommandStore safeStore);
- public abstract void notifyListeners();
-
- public abstract void addWaitingOnCommit(Command command);
-
- public abstract boolean isWaitingOnCommit();
-
- public abstract void removeWaitingOnCommit(PartialCommand command);
-
- public abstract PartialCommand firstWaitingOnCommit();
-
- public abstract void addWaitingOnApplyIfAbsent(PartialCommand command);
-
- public abstract boolean isWaitingOnApply();
+ protected abstract void addWaitingOnCommit(TxnId txnId);
+ protected abstract boolean isWaitingOnCommit();
+ protected abstract void removeWaitingOnCommit(TxnId txnId);
+ protected abstract TxnId firstWaitingOnCommit();
- public abstract void removeWaitingOnApply(PartialCommand command);
+ protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp
executeAt);
+ protected abstract boolean isWaitingOnApply();
+ protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
+ protected abstract TxnId firstWaitingOnApply();
- public abstract PartialCommand firstWaitingOnApply();
+ public boolean hasBeenWitnessed()
+ {
+ return partialTxn() != null;
+ }
- public boolean isUnableToApply()
+ public boolean isUnableToExecute()
{
return isWaitingOnCommit() || isWaitingOnApply();
}
@Override
public Iterable<TxnId> txnIds()
{
- return Iterables.concat(Collections.singleton(txnId()),
savedDeps().txnIds());
+ return Collections.singleton(txnId());
}
@Override
public Iterable<Key> keys()
{
- return txn().keys();
+ // TODO (now): when do we need this, and will it always be sufficient?
+ return partialTxn().keys();
}
- public void setGloballyPersistent(Key homeKey, Timestamp executeAt)
+ public void setDurability(SafeCommandStore safeStore, Durability
durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
{
- homeKey(homeKey);
- if (!hasBeen(Committed))
- this.executeAt(executeAt);
- else if (!this.executeAt().equals(executeAt))
- commandStore().agent().onInconsistentTimestamp(this,
this.executeAt(), executeAt);
- isGloballyPersistent(true);
+ updateHomeKey(safeStore, homeKey);
+ if (executeAt != null && hasBeen(Committed) &&
!this.executeAt().equals(executeAt))
+ safeStore.agent().onInconsistentTimestamp(this, this.executeAt(),
executeAt);
+ setDurability(durability);
}
- // requires that command != null
- // relies on mutual exclusion for each key
- // note: we do not set status = newStatus, we only use it to decide how we
register with the retryLog
- private void witness(Txn txn, Key homeKey, Key progressKey)
+ public enum AcceptOutcome
{
- txn(txn);
- homeKey(homeKey);
- progressKey(progressKey);
-
- if (status() == NotWitnessed)
- status(PreAccepted);
+ Success, Redundant, RejectedBallot
+ }
- if (executeAt() == null)
- {
- Timestamp max = commandStore().maxConflict(txn.keys());
- // unlike in the Accord paper, we partition shards within a node,
so that to ensure a total order we must either:
- // - use a global logical clock to issue new timestamps; or
- // - assign each shard _and_ process a unique id, and use both as
components of the timestamp
- Timestamp witnessed = txnId().compareTo(max) > 0 && txnId().epoch
>= commandStore().latestEpoch()
- ? txnId() : commandStore().uniqueNow(max);
- executeAt(witnessed);
+ public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn
partialTxn, AbstractRoute route, @Nullable RoutingKey progressKey)
+ {
+ if (promised().compareTo(Ballot.ZERO) > 0)
+ return AcceptOutcome.RejectedBallot;
- txn.keys().foldl(commandStore().ranges().since(txnId().epoch), (i,
key, param) -> {
- if (commandStore().hashIntersects(key))
- commandStore().commandsForKey(key).register(this);
- return null;
- }, null);
- }
+ return preacceptInternal(safeStore, partialTxn, route, progressKey);
}
- public boolean preaccept(Txn txn, Key homeKey, Key progressKey)
+ private AcceptOutcome preacceptInternal(SafeCommandStore safeStore,
PartialTxn partialTxn, AbstractRoute route, @Nullable RoutingKey progressKey)
{
- if (promised().compareTo(Ballot.ZERO) > 0)
+ if (executeAt() != null)
{
- logger.trace("{}: skipping preaccept - witnessed higher ballot
({})", txnId(), promised());
- return false;
+ logger.trace("{}: skipping preaccept - already preaccepted ({})",
txnId(), status());
+ return AcceptOutcome.Redundant;
}
- if (hasBeen(PreAccepted))
+ Status status = status();
+ switch (status)
{
- logger.trace("{}: skipping preaccept - already preaccepted ({})",
txnId(), status());
- return true;
+ default: throw new IllegalStateException();
+ case NotWitnessed:
+ case AcceptedInvalidate:
+ case Invalidated:
}
- witness(txn, homeKey, progressKey);
- boolean isProgressShard = progressKey != null &&
handles(txnId().epoch, progressKey);
- commandStore().progressLog().preaccept(this, isProgressShard,
isProgressShard && progressKey.equals(homeKey));
+ KeyRanges coordinateRanges = coordinateRanges(safeStore);
+ ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
+ if (!validate(KeyRanges.EMPTY, coordinateRanges, shard, route, Set,
partialTxn, Set, null, Ignore))
+ throw new IllegalStateException();
- logger.trace("{}: preaccepted with executeAt: {}, deps: {}", txnId(),
executeAt(), savedDeps());
- notifyListeners();
- return true;
+ Timestamp max = safeStore.maxConflict(partialTxn.keys());
+ // unlike in the Accord paper, we partition shards within a node, so
that to ensure a total order we must either:
+ // - use a global logical clock to issue new timestamps; or
+ // - assign each shard _and_ process a unique id, and use both as
components of the timestamp
+ TxnId txnId = txnId();
+ setExecuteAt(txnId.compareTo(max) > 0 && txnId.epoch >=
safeStore.latestEpoch()
+ ? txnId : safeStore.uniqueNow(max));
+
+ set(safeStore, KeyRanges.EMPTY, coordinateRanges, shard, route,
partialTxn, Set, null, Ignore);
+ if (status == NotWitnessed)
+ setStatus(PreAccepted);
+
+ safeStore.progressLog().preaccept(this, shard);
+
+ notifyListeners(safeStore);
+ return AcceptOutcome.Success;
}
- public boolean accept(Ballot ballot, Txn txn, Key homeKey, Key
progressKey, Timestamp executeAt, Deps deps)
+ public boolean preacceptInvalidate(Ballot ballot)
{
if (promised().compareTo(ballot) > 0)
{
- logger.trace("{}: skipping accept - witnessed higher ballot ({} >
{})", txnId(), promised(), ballot);
+ logger.trace("{}: skipping preacceptInvalidate - witnessed higher
ballot ({})", txnId(), promised());
return false;
}
+ setPromised(ballot);
+ return true;
+ }
+
+ public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot,
PartialRoute route, @Nullable RoutingKey progressKey, Timestamp executeAt,
PartialDeps partialDeps)
+ {
+ if (this.promised().compareTo(ballot) > 0)
+ {
+ logger.trace("{}: skipping accept - witnessed higher ballot ({} >
{})", txnId(), promised(), ballot);
+ return AcceptOutcome.RejectedBallot;
+ }
if (hasBeen(Committed))
{
logger.trace("{}: skipping accept - already committed ({})",
txnId(), status());
- return false;
+ return AcceptOutcome.Redundant;
}
- witness(txn, homeKey, progressKey);
- savedDeps(deps);
- executeAt(executeAt);
- promised(ballot);
- accepted(ballot);
- status(Accepted);
+ TxnId txnId = txnId();
+ KeyRanges coordinateRanges = coordinateRanges(safeStore);
+ KeyRanges executeRanges = txnId.epoch == executeAt.epoch ?
coordinateRanges : safeStore.ranges().at(executeAt.epoch);
+ ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
- boolean isProgressShard = progressKey != null &&
handles(txnId().epoch, progressKey);
- commandStore().progressLog().accept(this, isProgressShard,
isProgressShard && progressKey.equals(homeKey));
- logger.trace("{}: preaccepted with executeAt: {}, deps: {}", txnId(),
executeAt, deps);
+ if (!validate(coordinateRanges, executeRanges, shard, route, Ignore,
null, Ignore, partialDeps, Set))
+ throw new AssertionError("Invalid response from validate
function");
- notifyListeners();
- return true;
+ setExecuteAt(executeAt);
+ setPromised(ballot);
+ setAccepted(ballot);
+ set(safeStore, coordinateRanges, executeRanges, shard, route, null,
Ignore, partialDeps, Set);
+ setStatus(Accepted);
+
+ safeStore.progressLog().accept(this, shard);
+ notifyListeners(safeStore);
+
+ return AcceptOutcome.Success;
}
- public boolean acceptInvalidate(Ballot ballot)
+ public AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, Ballot
ballot)
{
if (this.promised().compareTo(ballot) > 0)
{
logger.trace("{}: skipping accept invalidated - witnessed higher
ballot ({} > {})", txnId(), promised(), ballot);
- return false;
+ return AcceptOutcome.RejectedBallot;
}
if (hasBeen(Committed))
{
logger.trace("{}: skipping accept invalidated - already committed
({})", txnId(), status());
- return false;
+ return AcceptOutcome.Redundant;
}
- promised(ballot);
- accepted(ballot);
- status(AcceptedInvalidate);
+ setPromised(ballot);
+ setAccepted(ballot);
+ setStatus(AcceptedInvalidate);
logger.trace("{}: accepted invalidated", txnId());
- notifyListeners();
- return true;
+ notifyListeners(safeStore);
+ return AcceptOutcome.Success;
}
+ public enum CommitOutcome { Success, Redundant, Insufficient }
+
// relies on mutual exclusion for each key
- public boolean commit(Txn txn, Key homeKey, Key progressKey, Timestamp
executeAt, Deps deps)
+ public CommitOutcome commit(SafeCommandStore safeStore, AbstractRoute
route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn,
Timestamp executeAt, PartialDeps partialDeps)
{
if (hasBeen(Committed))
{
logger.trace("{}: skipping commit - already committed ({})",
txnId(), status());
if (executeAt.equals(executeAt()) && status() != Invalidated)
- return false;
+ return CommitOutcome.Redundant;
- commandStore().agent().onInconsistentTimestamp(this, (status() ==
Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
+ safeStore.agent().onInconsistentTimestamp(this, (status() ==
Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
}
- witness(txn, homeKey, progressKey);
- savedDeps(deps);
- executeAt(executeAt);
- status(Committed);
- Preconditions.checkState(!isWaitingOnCommit());
- Preconditions.checkState(!isWaitingOnApply());
- logger.trace("{}: committed with executeAt: {}, deps: {}", txnId(),
executeAt, deps);
+ KeyRanges coordinateRanges = coordinateRanges(safeStore);
+ // TODO (now): consider ranges between coordinateRanges and
executeRanges? Perhaps don't need them
+ KeyRanges executeRanges = executeRanges(safeStore, executeAt);
+ ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
- KeyRanges ranges = commandStore().ranges().since(executeAt.epoch);
- if (ranges != null)
- {
- savedDeps().forEachOn(ranges, commandStore()::hashIntersects,
txnId -> {
- Command command = commandStore().command(txnId);
- logger.trace("{}: dep {} is {}", txnId(), command.txnId(),
command.status());
- switch (command.status())
- {
- default:
- throw new IllegalStateException();
- case NotWitnessed:
- case PreAccepted:
- case Accepted:
- case AcceptedInvalidate:
- // we don't know when these dependencies will execute,
and cannot execute until we do
- command.addListener(this);
- addWaitingOnCommit(command);
- break;
- case Committed:
- // TODO: split into ReadyToRead and ReadyToWrite;
- // the distributed read can be performed as soon
as those keys are ready, and in parallel with any other reads
- // the client can even ACK immediately after;
only the write needs to be postponed until other in-progress reads complete
- case ReadyToExecute:
- case Executed:
- case Applied:
- command.addListener(this);
- updatePredecessor(command);
- case Invalidated:
- break;
- }
- });
- }
+ if (!validate(coordinateRanges, executeRanges, shard, route, Check,
partialTxn, Add, partialDeps, Set))
+ return CommitOutcome.Insufficient;
+ setExecuteAt(executeAt);
+ set(safeStore, coordinateRanges, executeRanges, shard, route,
partialTxn, Add, partialDeps, Set);
- boolean isProgressShard = progressKey != null &&
handles(txnId().epoch, progressKey);
- commandStore().progressLog().commit(this, isProgressShard,
isProgressShard && progressKey.equals(homeKey));
+ setStatus(Committed);
+ logger.trace("{}: committed with executeAt: {}, deps: {}", txnId(),
executeAt, partialDeps);
+ populateWaitingOn(safeStore);
- return true;
+ safeStore.progressLog().commit(this, shard);
+
+ // TODO (now): introduce intermediate status to avoid reentry when
notifying listeners (which might notify us)
+ maybeExecute(safeStore, shard, true, true);
+ return CommitOutcome.Success;
}
- public Future<Void> commitAndBeginExecution(Txn txn, Key homeKey, Key
progressKey, Timestamp executeAt, Deps deps)
+ protected void populateWaitingOn(SafeCommandStore safeStore)
{
- if (!commit(txn, homeKey, progressKey, executeAt, deps))
- return Writes.SUCCESS;
-
- return maybeExecute(true);
+ KeyRanges ranges = safeStore.ranges().since(executeAt().epoch);
+ if (ranges != null) {
+ partialDeps().forEachOn(ranges,
safeStore.commandStore()::hashIntersects, txnId -> {
+ Command command = safeStore.ifLoaded(txnId);
Review Comment:
OK, I think the reason this was confusing is that the in memory
implementation doesn't do that. It's not clear that addAndInvokeListener is
loading it async in the Cassandra integration and then invoking the listener.
A comment on what `addAndInvokeListener` does would make it clearer.
Ideally this logic wouldn't be outside the command store you would just say
"I want this to happen if the command is loaded or after you load it."
I think this is trying to avoidAddWaitingOnCommit if we already know it's
committed which is why it doesn't fit cleanly into something that runs once
loaded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]