bdeggleston commented on code in PR #33:
URL: https://github.com/apache/cassandra-accord/pull/33#discussion_r1123711981
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,1178 +18,1435 @@
package accord.local;
-import accord.api.*;
-import accord.local.Status.Durability;
-import accord.local.Status.Known;
+import accord.api.Data;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
import accord.primitives.*;
-import accord.primitives.Writes;
import accord.utils.Invariants;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import static accord.local.Status.*;
-import static accord.local.Status.Known.*;
-import static accord.local.Status.Known.Done;
-import static accord.local.Status.Known.ExecuteAtOnly;
-import static accord.primitives.Route.isFullRoute;
-import static accord.utils.Utils.listOf;
-
-import javax.annotation.Nonnull;
+import accord.utils.Utils;
+import accord.utils.async.AsyncChain;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+
import javax.annotation.Nullable;
+import java.util.*;
-import accord.api.ProgressLog.ProgressShard;
-import accord.primitives.Ranges;
-import accord.primitives.Ballot;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.Route;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.api.Result;
-import accord.api.RoutingKey;
+import static accord.local.Status.Durability.Local;
+import static accord.local.Status.Durability.NotDurable;
+import static accord.local.Status.Known.DefinitionOnly;
+import static accord.utils.Utils.*;
+import static java.lang.String.format;
-import static accord.api.ProgressLog.ProgressShard.Home;
-import static accord.api.ProgressLog.ProgressShard.Local;
-import static accord.api.ProgressLog.ProgressShard.No;
-import static accord.api.ProgressLog.ProgressShard.Unsure;
-import static accord.local.Command.EnsureAction.Add;
-import static accord.local.Command.EnsureAction.Check;
-import static accord.local.Command.EnsureAction.Ignore;
-import static accord.local.Command.EnsureAction.Set;
-import static accord.local.Command.EnsureAction.TrySet;
-
-public abstract class Command implements CommandListener,
BiConsumer<SafeCommandStore, CommandListener>, PreLoadContext
+public abstract class Command extends ImmutableState
{
- private static final Logger logger =
LoggerFactory.getLogger(Command.class);
+ // sentinel value to indicate a command requested in a preexecute context
was not found
+ // should not escape the safe command store
+ public static final Command EMPTY = new Command()
+ {
+ @Override public Route<?> route() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public RoutingKey progressKey() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public RoutingKey homeKey() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public TxnId txnId() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public Ballot promised() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public Status.Durability durability() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public ImmutableSet<CommandListener> listeners() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public SaveStatus saveStatus() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public Timestamp executeAt() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public Ballot accepted() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public PartialTxn partialTxn() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Nullable
+ @Override public PartialDeps partialDeps() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
- public abstract TxnId txnId();
+ @Override
+ public String toString()
+ {
+ return "Command(EMPTY)";
+ }
+ };
- // TODO (desirable, API consistency): should any of these calls be
replaced by corresponding known() registers?
- public boolean hasBeen(Status status)
+ static
{
- return status().hasBeen(status);
+ EMPTY.markInvalidated();
}
- public boolean has(Known known)
+ static PreLoadContext contextForCommand(Command command)
{
- return known.isSatisfiedBy(saveStatus().known);
+ Invariants.checkState(command.hasBeen(Status.PreAccepted) &&
command.partialTxn() != null);
+ return command instanceof PreLoadContext ? (PreLoadContext) command :
PreLoadContext.contextFor(command.txnId(), command.partialTxn().keys());
}
- public boolean has(Definition definition)
+ private static Status.Durability durability(Status.Durability durability,
SaveStatus status)
{
- return known().definition.compareTo(definition) >= 0;
+ if (status.compareTo(SaveStatus.PreApplied) >= 0 && durability ==
NotDurable)
+ return Local; // not necessary anywhere, but helps for logical
consistency
+ return durability;
}
- public boolean has(Outcome outcome)
+ public interface CommonAttributes
{
- return known().outcome.compareTo(outcome) >= 0;
+ TxnId txnId();
+ Status.Durability durability();
+ RoutingKey homeKey();
+ RoutingKey progressKey();
+ Route<?> route();
+ PartialTxn partialTxn();
+ PartialDeps partialDeps();
+ ImmutableSet<CommandListener> listeners();
}
- public boolean is(Status status)
+ public static class SerializerSupport
{
- return status() == status;
- }
-
- /**
- * homeKey is a global value that defines the home shard - the one tasked
with ensuring the transaction is finished.
- * progressKey is a local value that defines the local shard responsible
for ensuring progress on the transaction.
- * This will be homeKey if it is owned by the node, and some other key
otherwise. If not the home shard, the progress
- * shard has much weaker responsibilities, only ensuring that the home
shard has durably witnessed the txnId.
- *
- * TODO (expected, efficiency): we probably do not want to save this on
its own, as we probably want to
- * minimize IO interactions and discrete registers, so will likely
reference commit log entries directly
- * At which point we may impose a requirement that only a Route can be
saved, not a homeKey on its own.
- * Once this restriction is imposed, we no longer need to pass around
Routable.Domain with TxnId.
- */
- public abstract RoutingKey homeKey();
- protected abstract void setHomeKey(RoutingKey key);
-
- public abstract RoutingKey progressKey();
- protected abstract void setProgressKey(RoutingKey key);
-
- /**
- * If this is the home shard, we require that this is a Route for all
states > NotWitnessed;
- * otherwise for the local progress shard this is ordinarily a
PartialRoute, and for other shards this is not set,
- * so that there is only one copy per node that can be consulted to
construct the full set of involved keys.
- *
- * If hasBeen(Committed) this must contain the keys for both txnId.epoch
and executeAt.epoch
- */
- public abstract @Nullable Route<?> route();
- protected abstract void setRoute(Route<?> route);
-
- public abstract PartialTxn partialTxn();
- protected abstract void setPartialTxn(PartialTxn txn);
-
- public abstract Ballot promised();
- protected abstract void setPromised(Ballot ballot);
-
- public abstract Ballot accepted();
- protected abstract void setAccepted(Ballot ballot);
-
- public abstract Timestamp executeAt();
- protected abstract void setExecuteAt(Timestamp timestamp);
-
- /**
- * While !hasBeen(Committed), used only as a register for Accept state,
used by Recovery
- * If hasBeen(Committed), represents the full deps owned by this range for
execution at both txnId.epoch
- * AND executeAt.epoch so that it may be used for Recovery (which contacts
only txnId.epoch topology),
- * but also for execution.
- */
- public abstract PartialDeps partialDeps();
- protected abstract void setPartialDeps(PartialDeps deps);
-
- public abstract Writes writes();
- protected abstract void setWrites(Writes writes);
-
- public abstract Result result();
- protected abstract void setResult(Result result);
-
- public abstract SaveStatus saveStatus();
- protected abstract void setSaveStatus(SaveStatus status);
-
- public Status status() { return saveStatus().status; }
- protected void setStatus(Status status) {
setSaveStatus(SaveStatus.get(status, known())); }
-
- public Known known() { return saveStatus().known; }
-
- public abstract Durability durability();
- public abstract void setDurability(Durability v);
+ public static Command.Listener listener(TxnId txnId)
+ {
+ return new Command.Listener(txnId);
+ }
- public abstract Command addListener(CommandListener listener);
- public abstract void removeListener(CommandListener listener);
- protected abstract void notifyListeners(SafeCommandStore safeStore);
+ public static NotWitnessed notWitnessed(CommonAttributes attributes,
Ballot promised)
+ {
+ return NotWitnessed.Factory.create(attributes, promised);
+ }
- protected abstract void addWaitingOnCommit(TxnId txnId);
- protected abstract void removeWaitingOnCommit(TxnId txnId);
- protected abstract TxnId firstWaitingOnCommit();
+ public static Preaccepted preaccepted(CommonAttributes common,
Timestamp executeAt, Ballot promised)
+ {
+ return Preaccepted.Factory.create(common, executeAt, promised);
+ }
- protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp
executeAt);
- protected abstract TxnId firstWaitingOnApply(@Nullable TxnId
ifExecutesBefore);
+ public static Accepted accepted(CommonAttributes common, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted)
+ {
+ return Accepted.Factory.create(common, status, executeAt,
promised, accepted);
+ }
- protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
- protected abstract boolean isWaitingOnDependency();
+ public static Committed committed(CommonAttributes common, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted,
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId>
waitingOnApply)
+ {
+ return Committed.Factory.create(common, status, executeAt,
promised, accepted, waitingOnCommit, waitingOnApply);
+ }
- public boolean hasBeenWitnessed()
- {
- return partialTxn() != null;
+ public static Executed executed(CommonAttributes common, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted,
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId>
waitingOnApply, Writes writes, Result result)
+ {
+ return Executed.Factory.create(common, status, executeAt,
promised, accepted, waitingOnCommit, waitingOnApply, writes, result);
+ }
}
- @Override
- public Iterable<TxnId> txnIds()
+ private static SaveStatus validateCommandClass(SaveStatus status, Class<?>
expected, Class<?> actual)
{
- return Collections.singleton(txnId());
+ if (actual != expected)
+ {
+ throw new IllegalStateException(format("Cannot instantiate %s for
status %s. %s expected",
+ actual.getSimpleName(),
status, expected.getSimpleName()));
+ }
+ return status;
}
- @Override
- public Seekables<?, ?> keys()
+ private static SaveStatus validateCommandClass(SaveStatus status, Class<?>
klass)
{
- // TODO (expected, consider): when do we need this, and will it always
be sufficient?
- return partialTxn().keys();
+ switch (status)
+ {
+ case NotWitnessed:
+ return validateCommandClass(status, NotWitnessed.class, klass);
+ case PreAccepted:
+ return validateCommandClass(status, Preaccepted.class, klass);
+ case AcceptedInvalidate:
+ case AcceptedInvalidateWithDefinition:
+ case Accepted:
+ case AcceptedWithDefinition:
+ return validateCommandClass(status, Accepted.class, klass);
+ case Committed:
+ case ReadyToExecute:
+ return validateCommandClass(status, Committed.class, klass);
+ case PreApplied:
+ case Applied:
+ case Invalidated:
+ return validateCommandClass(status, Executed.class, klass);
+ default:
+ throw new IllegalStateException("Unhandled status " + status);
+ }
}
- public void setDurability(SafeCommandStore safeStore, Durability
durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
+ public static Command addListener(SafeCommandStore safeStore, Command
command, CommandListener listener)
{
- updateHomeKey(safeStore, homeKey);
- if (executeAt != null && hasBeen(PreCommitted) &&
!this.executeAt().equals(executeAt))
- safeStore.agent().onInconsistentTimestamp(this, this.executeAt(),
executeAt);
- setDurability(durability);
+ return
safeStore.beginUpdate(command).addListener(listener).updateAttributes();
}
- public enum AcceptOutcome
+ public static Command removeListener(SafeCommandStore safeStore, Command
command, CommandListener listener)
{
- Success, Redundant, RejectedBallot
+ return
safeStore.beginUpdate(command).removeListener(listener).updateAttributes();
}
- public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn
partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
+ public static Committed updateWaitingOn(SafeCommandStore safeStore,
Committed command, WaitingOn.Update waitingOn)
{
- return preacceptOrRecover(safeStore, partialTxn, route, progressKey,
Ballot.ZERO);
+ if (!waitingOn.hasChanges())
+ return command;
+
+ Update update = safeStore.beginUpdate(command);
+ Committed updated = command instanceof Executed ?
+ Executed.Factory.update(command.asExecuted(), update,
waitingOn.build()) :
+ Committed.Factory.update(command, update, waitingOn.build());
+ return update.complete(updated);
}
- public AcceptOutcome recover(SafeCommandStore safeStore, PartialTxn
partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+ public static class Listener implements CommandListener
{
- return preacceptOrRecover(safeStore, partialTxn, route, progressKey,
ballot);
- }
+ protected final TxnId listenerId;
- private AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore,
PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot
ballot)
- {
- int compareBallots = promised().compareTo(ballot);
- if (compareBallots > 0)
+ private Listener(TxnId listenerId)
{
- logger.trace("{}: skipping preaccept - higher ballot witnessed
({})", txnId(), promised());
- return AcceptOutcome.RejectedBallot;
+ this.listenerId = listenerId;
}
- else if (compareBallots < 0)
+
+ @Override
+ public boolean equals(Object o)
{
- // save the new ballot as a promise
- setPromised(ballot);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Listener that = (Listener) o;
+ return listenerId.equals(that.listenerId);
}
- if (known().definition.isKnown())
+ @Override
+ public int hashCode()
{
- Invariants.checkState(status() == Invalidated || executeAt() !=
null);
- logger.trace("{}: skipping preaccept - already known ({})",
txnId(), status());
- // in case of Ballot.ZERO, we must either have a competing
recovery coordinator or have late delivery of the
- // preaccept; in the former case we should abandon coordination,
and in the latter we have already completed
- return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant :
AcceptOutcome.Success;
+ return Objects.hash(listenerId);
}
- Ranges coordinateRanges = coordinateRanges(safeStore);
- Invariants.checkState(!coordinateRanges.isEmpty());
- ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
- if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set,
partialTxn, Set, null, Ignore))
- throw new IllegalStateException();
-
- if (executeAt() == null)
+ @Override
+ public String toString()
{
- TxnId txnId = txnId();
- // unlike in the Accord paper, we partition shards within a node,
so that to ensure a total order we must either:
- // - use a global logical clock to issue new timestamps; or
- // - assign each shard _and_ process a unique id, and use both as
components of the timestamp
- // if we are performing recovery (i.e. non-zero ballot), do not
permit a fast path decision as we want to
- // invalidate any transactions that were not completed by their
initial coordinator
- if (ballot.equals(Ballot.ZERO))
setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
- else setExecuteAt(safeStore.time().uniqueNow(txnId));
+ return "ListenerProxy{" + listenerId + '}';
+ }
- if (status() == NotWitnessed)
- setStatus(PreAccepted);
- safeStore.progressLog().preaccepted(this, shard);
+ public TxnId txnId()
+ {
+ return listenerId;
}
- else
+
+ @Override
+ public void onChange(SafeCommandStore safeStore, TxnId txnId)
{
- // TODO (expected, ?): in the case that we are pre-committed but
had not been preaccepted/accepted, should we inform progressLog?
- setSaveStatus(SaveStatus.enrich(saveStatus(), DefinitionOnly));
+ Commands.listenerUpdate(safeStore, safeStore.command(listenerId),
safeStore.command(txnId));
}
- set(safeStore, Ranges.EMPTY, coordinateRanges, shard, route,
partialTxn, Set, null, Ignore);
- notifyListeners(safeStore);
- return AcceptOutcome.Success;
+ @Override
+ public PreLoadContext listenerPreLoadContext(TxnId caller)
+ {
+ return PreLoadContext.contextFor(Utils.listOf(listenerId, caller),
Keys.EMPTY);
+ }
}
- public boolean preacceptInvalidate(Ballot ballot)
+ public static CommandListener listener(TxnId txnId)
{
- if (promised().compareTo(ballot) > 0)
- {
- logger.trace("{}: skipping preacceptInvalidate - witnessed higher
ballot ({})", txnId(), promised());
- return false;
- }
- setPromised(ballot);
- return true;
+ return new Listener(txnId);
}
- public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot,
PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey,
Timestamp executeAt, PartialDeps partialDeps)
+ private abstract static class AbstractCommand extends Command
{
- if (this.promised().compareTo(ballot) > 0)
+ private final TxnId txnId;
+ private final SaveStatus status;
+ private final Status.Durability durability;
+ private final RoutingKey homeKey;
Review Comment:
So in that case, I’d say we should leave things as they are.
AcceptedInvalidate and Invalidate use Accepted and Committed classes
respectively, so they also need to support homeKey only and Route. It may make
sense to introduce an additional Invalidated command class, though I’m not sure
if it would cause problems in scenarios where an invalidation was accepted,
then the command becomes committed after having forgotten it’s definition.
Either way though, I think exploring that should be follow on work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]