bdeggleston commented on code in PR #33:
URL: https://github.com/apache/cassandra-accord/pull/33#discussion_r1101820590
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,1178 +18,1435 @@
package accord.local;
-import accord.api.*;
-import accord.local.Status.Durability;
-import accord.local.Status.Known;
+import accord.api.Data;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
import accord.primitives.*;
-import accord.primitives.Writes;
import accord.utils.Invariants;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import static accord.local.Status.*;
-import static accord.local.Status.Known.*;
-import static accord.local.Status.Known.Done;
-import static accord.local.Status.Known.ExecuteAtOnly;
-import static accord.primitives.Route.isFullRoute;
-import static accord.utils.Utils.listOf;
-
-import javax.annotation.Nonnull;
+import accord.utils.Utils;
+import accord.utils.async.AsyncChain;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+
import javax.annotation.Nullable;
+import java.util.*;
-import accord.api.ProgressLog.ProgressShard;
-import accord.primitives.Ranges;
-import accord.primitives.Ballot;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.Route;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.api.Result;
-import accord.api.RoutingKey;
+import static accord.local.Status.Durability.Local;
+import static accord.local.Status.Durability.NotDurable;
+import static accord.local.Status.Known.DefinitionOnly;
+import static accord.utils.Utils.*;
+import static java.lang.String.format;
-import static accord.api.ProgressLog.ProgressShard.Home;
-import static accord.api.ProgressLog.ProgressShard.Local;
-import static accord.api.ProgressLog.ProgressShard.No;
-import static accord.api.ProgressLog.ProgressShard.Unsure;
-import static accord.local.Command.EnsureAction.Add;
-import static accord.local.Command.EnsureAction.Check;
-import static accord.local.Command.EnsureAction.Ignore;
-import static accord.local.Command.EnsureAction.Set;
-import static accord.local.Command.EnsureAction.TrySet;
-
-public abstract class Command implements CommandListener,
BiConsumer<SafeCommandStore, CommandListener>, PreLoadContext
+public abstract class Command extends ImmutableState
{
- private static final Logger logger =
LoggerFactory.getLogger(Command.class);
+ // sentinel value to indicate a command requested in a preexecute context
was not found
+ // should not escape the safe command store
+ public static final Command EMPTY = new Command()
+ {
+ @Override public Route<?> route() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public RoutingKey progressKey() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public RoutingKey homeKey() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public TxnId txnId() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public Ballot promised() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public Status.Durability durability() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public ImmutableSet<CommandListener> listeners() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public SaveStatus saveStatus() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public Timestamp executeAt() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public Ballot accepted() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Override public PartialTxn partialTxn() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+ @Nullable
+ @Override public PartialDeps partialDeps() { throw new
IllegalStateException("Attempting to access EMPTY sentinel values"); }
- public abstract TxnId txnId();
+ @Override
+ public String toString()
+ {
+ return "Command(EMPTY)";
+ }
+ };
- // TODO (desirable, API consistency): should any of these calls be
replaced by corresponding known() registers?
- public boolean hasBeen(Status status)
+ static
{
- return status().hasBeen(status);
+ EMPTY.markInvalidated();
}
- public boolean has(Known known)
+ static PreLoadContext contextForCommand(Command command)
{
- return known.isSatisfiedBy(saveStatus().known);
+ Invariants.checkState(command.hasBeen(Status.PreAccepted) &&
command.partialTxn() != null);
+ return command instanceof PreLoadContext ? (PreLoadContext) command :
PreLoadContext.contextFor(command.txnId(), command.partialTxn().keys());
}
- public boolean has(Definition definition)
+ private static Status.Durability durability(Status.Durability durability,
SaveStatus status)
{
- return known().definition.compareTo(definition) >= 0;
+ if (status.compareTo(SaveStatus.PreApplied) >= 0 && durability ==
NotDurable)
+ return Local; // not necessary anywhere, but helps for logical
consistency
+ return durability;
}
- public boolean has(Outcome outcome)
+ public interface CommonAttributes
{
- return known().outcome.compareTo(outcome) >= 0;
+ TxnId txnId();
+ Status.Durability durability();
+ RoutingKey homeKey();
+ RoutingKey progressKey();
+ Route<?> route();
+ PartialTxn partialTxn();
+ PartialDeps partialDeps();
+ ImmutableSet<CommandListener> listeners();
}
- public boolean is(Status status)
+ public static class SerializerSupport
{
- return status() == status;
- }
-
- /**
- * homeKey is a global value that defines the home shard - the one tasked
with ensuring the transaction is finished.
- * progressKey is a local value that defines the local shard responsible
for ensuring progress on the transaction.
- * This will be homeKey if it is owned by the node, and some other key
otherwise. If not the home shard, the progress
- * shard has much weaker responsibilities, only ensuring that the home
shard has durably witnessed the txnId.
- *
- * TODO (expected, efficiency): we probably do not want to save this on
its own, as we probably want to
- * minimize IO interactions and discrete registers, so will likely
reference commit log entries directly
- * At which point we may impose a requirement that only a Route can be
saved, not a homeKey on its own.
- * Once this restriction is imposed, we no longer need to pass around
Routable.Domain with TxnId.
- */
- public abstract RoutingKey homeKey();
- protected abstract void setHomeKey(RoutingKey key);
-
- public abstract RoutingKey progressKey();
- protected abstract void setProgressKey(RoutingKey key);
-
- /**
- * If this is the home shard, we require that this is a Route for all
states > NotWitnessed;
- * otherwise for the local progress shard this is ordinarily a
PartialRoute, and for other shards this is not set,
- * so that there is only one copy per node that can be consulted to
construct the full set of involved keys.
- *
- * If hasBeen(Committed) this must contain the keys for both txnId.epoch
and executeAt.epoch
- */
- public abstract @Nullable Route<?> route();
- protected abstract void setRoute(Route<?> route);
-
- public abstract PartialTxn partialTxn();
- protected abstract void setPartialTxn(PartialTxn txn);
-
- public abstract Ballot promised();
- protected abstract void setPromised(Ballot ballot);
-
- public abstract Ballot accepted();
- protected abstract void setAccepted(Ballot ballot);
-
- public abstract Timestamp executeAt();
- protected abstract void setExecuteAt(Timestamp timestamp);
-
- /**
- * While !hasBeen(Committed), used only as a register for Accept state,
used by Recovery
- * If hasBeen(Committed), represents the full deps owned by this range for
execution at both txnId.epoch
- * AND executeAt.epoch so that it may be used for Recovery (which contacts
only txnId.epoch topology),
- * but also for execution.
- */
- public abstract PartialDeps partialDeps();
- protected abstract void setPartialDeps(PartialDeps deps);
-
- public abstract Writes writes();
- protected abstract void setWrites(Writes writes);
-
- public abstract Result result();
- protected abstract void setResult(Result result);
-
- public abstract SaveStatus saveStatus();
- protected abstract void setSaveStatus(SaveStatus status);
-
- public Status status() { return saveStatus().status; }
- protected void setStatus(Status status) {
setSaveStatus(SaveStatus.get(status, known())); }
-
- public Known known() { return saveStatus().known; }
-
- public abstract Durability durability();
- public abstract void setDurability(Durability v);
+ public static Command.Listener listener(TxnId txnId)
+ {
+ return new Command.Listener(txnId);
+ }
- public abstract Command addListener(CommandListener listener);
- public abstract void removeListener(CommandListener listener);
- protected abstract void notifyListeners(SafeCommandStore safeStore);
+ public static NotWitnessed notWitnessed(CommonAttributes attributes,
Ballot promised)
+ {
+ return NotWitnessed.Factory.create(attributes, promised);
+ }
- protected abstract void addWaitingOnCommit(TxnId txnId);
- protected abstract void removeWaitingOnCommit(TxnId txnId);
- protected abstract TxnId firstWaitingOnCommit();
+ public static Preaccepted preaccepted(CommonAttributes common,
Timestamp executeAt, Ballot promised)
+ {
+ return Preaccepted.Factory.create(common, executeAt, promised);
+ }
- protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp
executeAt);
- protected abstract TxnId firstWaitingOnApply(@Nullable TxnId
ifExecutesBefore);
+ public static Accepted accepted(CommonAttributes common, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted)
+ {
+ return Accepted.Factory.create(common, status, executeAt,
promised, accepted);
+ }
- protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
- protected abstract boolean isWaitingOnDependency();
+ public static Committed committed(CommonAttributes common, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted,
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId>
waitingOnApply)
+ {
+ return Committed.Factory.create(common, status, executeAt,
promised, accepted, waitingOnCommit, waitingOnApply);
+ }
- public boolean hasBeenWitnessed()
- {
- return partialTxn() != null;
+ public static Executed executed(CommonAttributes common, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted,
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId>
waitingOnApply, Writes writes, Result result)
+ {
+ return Executed.Factory.create(common, status, executeAt,
promised, accepted, waitingOnCommit, waitingOnApply, writes, result);
+ }
}
- @Override
- public Iterable<TxnId> txnIds()
+ private static SaveStatus validateCommandClass(SaveStatus status, Class<?>
expected, Class<?> actual)
{
- return Collections.singleton(txnId());
+ if (actual != expected)
+ {
+ throw new IllegalStateException(format("Cannot instantiate %s for
status %s. %s expected",
+ actual.getSimpleName(),
status, expected.getSimpleName()));
+ }
+ return status;
}
- @Override
- public Seekables<?, ?> keys()
+ private static SaveStatus validateCommandClass(SaveStatus status, Class<?>
klass)
{
- // TODO (expected, consider): when do we need this, and will it always
be sufficient?
- return partialTxn().keys();
+ switch (status)
+ {
+ case NotWitnessed:
+ return validateCommandClass(status, NotWitnessed.class, klass);
+ case PreAccepted:
+ return validateCommandClass(status, Preaccepted.class, klass);
+ case AcceptedInvalidate:
+ case AcceptedInvalidateWithDefinition:
+ case Accepted:
+ case AcceptedWithDefinition:
+ return validateCommandClass(status, Accepted.class, klass);
+ case Committed:
+ case ReadyToExecute:
+ return validateCommandClass(status, Committed.class, klass);
+ case PreApplied:
+ case Applied:
+ case Invalidated:
+ return validateCommandClass(status, Executed.class, klass);
+ default:
+ throw new IllegalStateException("Unhandled status " + status);
+ }
}
- public void setDurability(SafeCommandStore safeStore, Durability
durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
+ public static Command addListener(SafeCommandStore safeStore, Command
command, CommandListener listener)
{
- updateHomeKey(safeStore, homeKey);
- if (executeAt != null && hasBeen(PreCommitted) &&
!this.executeAt().equals(executeAt))
- safeStore.agent().onInconsistentTimestamp(this, this.executeAt(),
executeAt);
- setDurability(durability);
+ return
safeStore.beginUpdate(command).addListener(listener).updateAttributes();
}
- public enum AcceptOutcome
+ public static Command removeListener(SafeCommandStore safeStore, Command
command, CommandListener listener)
{
- Success, Redundant, RejectedBallot
+ return
safeStore.beginUpdate(command).removeListener(listener).updateAttributes();
}
- public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn
partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
+ public static Committed updateWaitingOn(SafeCommandStore safeStore,
Committed command, WaitingOn.Update waitingOn)
{
- return preacceptOrRecover(safeStore, partialTxn, route, progressKey,
Ballot.ZERO);
+ if (!waitingOn.hasChanges())
+ return command;
+
+ Update update = safeStore.beginUpdate(command);
+ Committed updated = command instanceof Executed ?
+ Executed.Factory.update(command.asExecuted(), update,
waitingOn.build()) :
+ Committed.Factory.update(command, update, waitingOn.build());
+ return update.complete(updated);
}
- public AcceptOutcome recover(SafeCommandStore safeStore, PartialTxn
partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+ public static class Listener implements CommandListener
{
- return preacceptOrRecover(safeStore, partialTxn, route, progressKey,
ballot);
- }
+ protected final TxnId listenerId;
- private AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore,
PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot
ballot)
- {
- int compareBallots = promised().compareTo(ballot);
- if (compareBallots > 0)
+ private Listener(TxnId listenerId)
{
- logger.trace("{}: skipping preaccept - higher ballot witnessed
({})", txnId(), promised());
- return AcceptOutcome.RejectedBallot;
+ this.listenerId = listenerId;
}
- else if (compareBallots < 0)
+
+ @Override
+ public boolean equals(Object o)
{
- // save the new ballot as a promise
- setPromised(ballot);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Listener that = (Listener) o;
+ return listenerId.equals(that.listenerId);
}
- if (known().definition.isKnown())
+ @Override
+ public int hashCode()
{
- Invariants.checkState(status() == Invalidated || executeAt() !=
null);
- logger.trace("{}: skipping preaccept - already known ({})",
txnId(), status());
- // in case of Ballot.ZERO, we must either have a competing
recovery coordinator or have late delivery of the
- // preaccept; in the former case we should abandon coordination,
and in the latter we have already completed
- return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant :
AcceptOutcome.Success;
+ return Objects.hash(listenerId);
}
- Ranges coordinateRanges = coordinateRanges(safeStore);
- Invariants.checkState(!coordinateRanges.isEmpty());
- ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
- if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set,
partialTxn, Set, null, Ignore))
- throw new IllegalStateException();
-
- if (executeAt() == null)
+ @Override
+ public String toString()
{
- TxnId txnId = txnId();
- // unlike in the Accord paper, we partition shards within a node,
so that to ensure a total order we must either:
- // - use a global logical clock to issue new timestamps; or
- // - assign each shard _and_ process a unique id, and use both as
components of the timestamp
- // if we are performing recovery (i.e. non-zero ballot), do not
permit a fast path decision as we want to
- // invalidate any transactions that were not completed by their
initial coordinator
- if (ballot.equals(Ballot.ZERO))
setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
- else setExecuteAt(safeStore.time().uniqueNow(txnId));
+ return "ListenerProxy{" + listenerId + '}';
+ }
- if (status() == NotWitnessed)
- setStatus(PreAccepted);
- safeStore.progressLog().preaccepted(this, shard);
+ public TxnId txnId()
+ {
+ return listenerId;
}
- else
+
+ @Override
+ public void onChange(SafeCommandStore safeStore, TxnId txnId)
{
- // TODO (expected, ?): in the case that we are pre-committed but
had not been preaccepted/accepted, should we inform progressLog?
- setSaveStatus(SaveStatus.enrich(saveStatus(), DefinitionOnly));
+ Commands.listenerUpdate(safeStore, safeStore.command(listenerId),
safeStore.command(txnId));
}
- set(safeStore, Ranges.EMPTY, coordinateRanges, shard, route,
partialTxn, Set, null, Ignore);
- notifyListeners(safeStore);
- return AcceptOutcome.Success;
+ @Override
+ public PreLoadContext listenerPreLoadContext(TxnId caller)
+ {
+ return PreLoadContext.contextFor(Utils.listOf(listenerId, caller),
Keys.EMPTY);
+ }
}
- public boolean preacceptInvalidate(Ballot ballot)
+ public static CommandListener listener(TxnId txnId)
{
- if (promised().compareTo(ballot) > 0)
- {
- logger.trace("{}: skipping preacceptInvalidate - witnessed higher
ballot ({})", txnId(), promised());
- return false;
- }
- setPromised(ballot);
- return true;
+ return new Listener(txnId);
}
- public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot,
PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey,
Timestamp executeAt, PartialDeps partialDeps)
+ private abstract static class AbstractCommand extends Command
{
- if (this.promised().compareTo(ballot) > 0)
+ private final TxnId txnId;
+ private final SaveStatus status;
+ private final Status.Durability durability;
+ private final RoutingKey homeKey;
+ private final RoutingKey progressKey;
+ private final Route<?> route;
+ private final Ballot promised;
+ private final ImmutableSet<CommandListener> listeners;
+
+ private AbstractCommand(TxnId txnId, SaveStatus status,
Status.Durability durability, RoutingKey homeKey, RoutingKey progressKey,
Route<?> route, Ballot promised, ImmutableSet<CommandListener> listeners)
{
- logger.trace("{}: skipping accept - witnessed higher ballot ({} >
{})", txnId(), promised(), ballot);
- return AcceptOutcome.RejectedBallot;
+ this.txnId = txnId;
+ this.status = validateCommandClass(status, getClass());
+ this.durability = durability;
+ this.homeKey = homeKey;
+ this.progressKey = progressKey;
+ this.route = route;
+ this.promised = promised;
+ this.listeners = listeners;
}
- if (hasBeen(PreCommitted))
+ private AbstractCommand(CommonAttributes common, SaveStatus status,
Ballot promised)
{
- logger.trace("{}: skipping accept - already committed ({})",
txnId(), status());
- return AcceptOutcome.Redundant;
+ this.txnId = common.txnId();
+ this.status = validateCommandClass(status, getClass());
+ this.durability = common.durability();
+ this.homeKey = common.homeKey();
+ this.progressKey = common.progressKey();
+ this.route = common.route();
+ this.promised = promised;
+ this.listeners = common.listeners();
}
- TxnId txnId = txnId();
- Ranges coordinateRanges = coordinateRanges(safeStore);
- Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ?
coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
- Invariants.checkState(!acceptRanges.isEmpty());
- ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
-
- if (!validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore,
null, Ignore, partialDeps, Set))
- throw new AssertionError("Invalid response from validate
function");
-
- setExecuteAt(executeAt);
- setPromised(ballot);
- setAccepted(ballot);
-
- // TODO (desired, clarity/efficiency): we don't need to set the route
here, and perhaps we don't even need to
- // distributed partialDeps at all, since all we gain is not waiting
for these transactions to commit during
- // recovery. We probably don't want to directly persist a Route in
any other circumstances, either, to ease persistence.
- set(safeStore, coordinateRanges, acceptRanges, shard, route, null,
Ignore, partialDeps, Set);
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Command command = (Command) o;
+ return txnId.equals(command.txnId())
+ && status == command.saveStatus()
+ && durability == command.durability()
+ && Objects.equals(homeKey, command.homeKey())
+ && Objects.equals(progressKey, command.progressKey())
+ && Objects.equals(route, command.route())
+ && Objects.equals(promised, command.promised())
+ && listeners.equals(command.listeners());
+ }
- // set only registers by transaction keys, which we mightn't already
have received
- if (!known().isDefinitionKnown())
- safeStore.register(keys, acceptRanges, this);
+ @Override
+ public String toString()
+ {
+ return "Command@" + System.identityHashCode(this) + '{' + txnId +
':' + status + '}';
+ }
- setStatus(Accepted);
- safeStore.progressLog().accepted(this, shard);
- notifyListeners(safeStore);
+ @Override
+ public int hashCode()
+ {
+ int hash = 1;
+ hash = 31 * hash + txnId.hashCode();
+ hash = 31 * hash + status.hashCode();
+ hash = 31 * hash + Objects.hashCode(durability);
+ hash = 31 * hash + Objects.hashCode(homeKey);
+ hash = 31 * hash + Objects.hashCode(progressKey);
+ hash = 31 * hash + Objects.hashCode(route);
+ hash = 31 * hash + Objects.hashCode(promised);
+ hash = 31 * hash + Objects.hashCode(listeners);
+ return hash;
+ }
- return AcceptOutcome.Success;
- }
+ @Override
+ public TxnId txnId()
+ {
+ return txnId;
+ }
- public AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, Ballot
ballot)
- {
- if (this.promised().compareTo(ballot) > 0)
+ @Override
+ public final RoutingKey homeKey()
{
- logger.trace("{}: skipping accept invalidated - witnessed higher
ballot ({} > {})", txnId(), promised(), ballot);
- return AcceptOutcome.RejectedBallot;
+ checkCanReadFrom();
+ return homeKey;
}
- if (hasBeen(PreCommitted))
+ @Override
+ public final RoutingKey progressKey()
{
- logger.trace("{}: skipping accept invalidated - already committed
({})", txnId(), status());
- return AcceptOutcome.Redundant;
+ checkCanReadFrom();
+ return progressKey;
}
- setPromised(ballot);
- setAccepted(ballot);
- setStatus(AcceptedInvalidate);
- setPartialDeps(null);
- logger.trace("{}: accepted invalidated", txnId());
+ @Override
+ public final Route<?> route()
+ {
+ checkCanReadFrom();
+ return route;
+ }
- notifyListeners(safeStore);
- return AcceptOutcome.Success;
- }
+ @Override
+ public Ballot promised()
+ {
+ checkCanReadFrom();
+ return promised;
+ }
- public enum CommitOutcome { Success, Redundant, Insufficient }
+ @Override
+ public Status.Durability durability()
+ {
+ checkCanReadFrom();
+ return Command.durability(durability, saveStatus());
+ }
- // relies on mutual exclusion for each key
- public CommitOutcome commit(SafeCommandStore safeStore, Route<?> route,
@Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp
executeAt, PartialDeps partialDeps)
- {
- if (hasBeen(PreCommitted))
+ @Override
+ public ImmutableSet<CommandListener> listeners()
{
- logger.trace("{}: skipping commit - already committed ({})",
txnId(), status());
- if (!executeAt.equals(executeAt()) || status() == Invalidated)
- safeStore.agent().onInconsistentTimestamp(this, (status() ==
Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
+ checkCanReadFrom();
+ if (listeners == null)
+ return ImmutableSet.of();
+ return listeners;
+ }
- if (hasBeen(Committed))
- return CommitOutcome.Redundant;
+ @Override
+ public final SaveStatus saveStatus()
+ {
+ checkCanReadFrom();
+ return status;
}
+ }
+
+ /**
+ * If this is the home shard, we require that this is a Route for all
states > NotWitnessed;
+ * otherwise for the local progress shard this is ordinarily a
PartialRoute, and for other shards this is not set,
+ * so that there is only one copy per node that can be consulted to
construct the full set of involved keys.
+ *
+ * If hasBeen(Committed) this must contain the keys for both txnId.epoch
and executeAt.epoch
+ */
+ public abstract Route<?> route();
+ public abstract RoutingKey progressKey();
- Ranges coordinateRanges = coordinateRanges(safeStore);
- // TODO (expected, consider): consider ranges between coordinateRanges
and executeRanges? Perhaps don't need them
- Ranges executeRanges = executeRanges(safeStore, executeAt);
- ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
+ /**
+ * homeKey is a global value that defines the home shard - the one tasked
with ensuring the transaction is finished.
+ * progressKey is a local value that defines the local shard responsible
for ensuring progress on the transaction.
+ * This will be homeKey if it is owned by the node, and some other key
otherwise. If not the home shard, the progress
+ * shard has much weaker responsibilities, only ensuring that the home
shard has durably witnessed the txnId.
+ *
+ * TODO (expected, efficiency): we probably do not want to save this on
its own, as we probably want to
+ * minimize IO interactions and discrete registers, so will likely
reference commit log entries directly
+ * At which point we may impose a requirement that only a Route can be
saved, not a homeKey on its own.
+ * Once this restriction is imposed, we no longer need to pass around
Routable.Domain with TxnId.
+ */
+ public abstract RoutingKey homeKey();
+ public abstract TxnId txnId();
+ public abstract Ballot promised();
+ public abstract Status.Durability durability();
+ public abstract ImmutableSet<CommandListener> listeners();
+ public abstract SaveStatus saveStatus();
- if (!validate(coordinateRanges, executeRanges, shard, route, Check,
partialTxn, Add, partialDeps, Set))
- return CommitOutcome.Insufficient;
+ private static boolean isSameClass(Command command, Class<? extends
Command> klass)
+ {
+ return command.getClass() == klass;
+ }
- setExecuteAt(executeAt);
- set(safeStore, coordinateRanges, executeRanges, shard, route,
partialTxn, Add, partialDeps, Set);
+ private static void checkNewBallot(Ballot current, Ballot next, String
name)
+ {
+ if (next.compareTo(current) < 0)
+ throw new IllegalArgumentException(String.format("Cannot update %s
ballot from %s to %s. New ballot is less than current", name, current, next));
+ }
- setStatus(Committed);
- logger.trace("{}: committed with executeAt: {}, deps: {}", txnId(),
executeAt, partialDeps);
- populateWaitingOn(safeStore);
+ private static void checkPromised(Command command, Ballot ballot)
+ {
+ checkNewBallot(command.promised(), ballot, "promised");
+ }
- safeStore.progressLog().committed(this, shard);
+ private static void checkAccepted(Command command, Ballot ballot)
+ {
+ checkNewBallot(command.accepted(), ballot, "accepted");
+ }
- // TODO (expected, safety): introduce intermediate status to avoid
reentry when notifying listeners (which might notify us)
- maybeExecute(safeStore, shard, true, true);
- return CommitOutcome.Success;
+ private static void checkSameClass(Command command, Class<? extends
Command> klass, String errorMsg)
+ {
+ if (!isSameClass(command, klass))
+ throw new IllegalArgumentException(errorMsg + format(" expected %s
got %s", klass.getSimpleName(), command.getClass().getSimpleName()));
}
- // relies on mutual exclusion for each key
- public void precommit(SafeCommandStore safeStore, Timestamp executeAt)
+ // TODO (low priority, progress): callers should try to consult the local
progress shard (if any) to obtain the full set of keys owned locally
+ public final Route<?> someRoute()
{
- if (hasBeen(PreCommitted))
- {
- logger.trace("{}: skipping precommit - already committed ({})",
txnId(), status());
- if (executeAt.equals(executeAt()) && status() != Invalidated)
- return;
+ checkCanReadFrom();
+ if (route() != null)
+ return route();
- safeStore.agent().onInconsistentTimestamp(this, (status() ==
Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
- }
+ if (homeKey() != null)
+ return PartialRoute.empty(txnId().domain(), homeKey());
- setExecuteAt(executeAt);
- setStatus(PreCommitted);
- notifyListeners(safeStore);
- logger.trace("{}: precommitted with executeAt: {}", txnId(),
executeAt);
+ return null;
}
- protected void populateWaitingOn(SafeCommandStore safeStore)
+ public Unseekables<?, ?> maxUnseekables()
{
- Ranges ranges = safeStore.ranges().since(executeAt().epoch());
- if (ranges != null)
- {
- partialDeps().forEach(ranges, txnId -> {
- Command command = safeStore.ifLoaded(txnId);
- if (command == null)
- {
- addWaitingOnCommit(txnId);
- safeStore.addAndInvokeListener(txnId, this);
- }
- else
- {
- switch (command.status())
- {
- default:
- throw new IllegalStateException();
- case NotWitnessed:
- case PreAccepted:
- case Accepted:
- case AcceptedInvalidate:
- // we don't know when these dependencies will
execute, and cannot execute until we do
- command.addListener(this);
- addWaitingOnCommit(command.txnId());
- break;
- case PreCommitted:
- case Committed:
- // TODO (desired, efficiency): split into
ReadyToRead and ReadyToWrite;
- // the distributed
read can be performed as soon as those keys are ready,
- // and in parallel
with any other reads. the client can even ACK immediately after;
- // only the write
needs to be postponed until other in-progress reads complete
- case ReadyToExecute:
- case PreApplied:
- case Applied:
- command.addListener(this);
- insertPredecessor(command);
- case Invalidated:
- break;
- }
- }
- });
- }
+ Route<?> route = someRoute();
+ if (route == null)
+ return null;
+
+ return route.toMaximalUnseekables();
}
- // TODO (expected, ?): commitInvalidate may need to update cfks _if_
possible
- public void commitInvalidate(SafeCommandStore safeStore)
+ public PreLoadContext contextForSelf()
{
- if (hasBeen(PreCommitted))
- {
- logger.trace("{}: skipping commit invalidated - already committed
({})", txnId(), status());
- if (!hasBeen(Invalidated))
- safeStore.agent().onInconsistentTimestamp(this,
Timestamp.NONE, executeAt());
-
- return;
- }
+ checkCanReadFrom();
+ return contextForCommand(this);
+ }
- ProgressShard shard = progressShard(safeStore);
- safeStore.progressLog().invalidated(this, shard);
- setExecuteAt(txnId());
- if (partialDeps() == null)
- setPartialDeps(PartialDeps.NONE);
- setStatus(Invalidated);
- logger.trace("{}: committed invalidated", txnId());
+ public abstract Timestamp executeAt();
+ public abstract Ballot accepted();
+ public abstract PartialTxn partialTxn();
+ public abstract @Nullable PartialDeps partialDeps();
- notifyListeners(safeStore);
+ public final Status status()
+ {
+ checkCanReadFrom();
+ return saveStatus().status;
}
- public enum ApplyOutcome { Success, Redundant, Insufficient }
+ public final Status.Known known()
+ {
+ checkCanReadFrom();
+ return saveStatus().known;
+ }
- public ApplyOutcome apply(SafeCommandStore safeStore, long untilEpoch,
Route<?> route, Timestamp executeAt, @Nullable PartialDeps partialDeps, Writes
writes, Result result)
+ public boolean hasBeenWitnessed()
{
- if (hasBeen(PreApplied) && executeAt.equals(this.executeAt()))
- {
- logger.trace("{}: skipping apply - already executed ({})",
txnId(), status());
- return ApplyOutcome.Redundant;
- }
- else if (hasBeen(PreCommitted) && !executeAt.equals(this.executeAt()))
- {
- safeStore.agent().onInconsistentTimestamp(this, this.executeAt(),
executeAt);
- }
+ checkCanReadFrom();
+ return partialTxn() != null;
+ }
- Ranges coordinateRanges = coordinateRanges(safeStore);
- Ranges executeRanges = executeRanges(safeStore, executeAt);
- if (untilEpoch < safeStore.latestEpoch())
- {
- Ranges expectedRanges =
safeStore.ranges().between(executeAt.epoch(), untilEpoch);
- Invariants.checkState(expectedRanges.containsAll(executeRanges));
- }
- ProgressShard shard = progressShard(safeStore, route,
coordinateRanges);
+ public final boolean hasBeen(Status status)
+ {
+ return status().compareTo(status) >= 0;
+ }
- if (!validate(coordinateRanges, executeRanges, shard, route, Check,
null, Check, partialDeps, hasBeen(Committed) ? Add : TrySet))
- return ApplyOutcome.Insufficient; // TODO (expected, consider):
this should probably be an assertion failure if !TrySet
+ public boolean has(Status.Known known)
+ {
+ return known.isSatisfiedBy(saveStatus().known);
+ }
- setWrites(writes);
- setResult(result);
- setExecuteAt(executeAt);
- set(safeStore, coordinateRanges, executeRanges, shard, route, null,
Check, partialDeps, hasBeen(Committed) ? Add : TrySet);
+ public boolean has(Status.Definition definition)
+ {
+ return known().definition.compareTo(definition) >= 0;
+ }
- if (!hasBeen(Committed))
- populateWaitingOn(safeStore);
- setStatus(PreApplied);
- logger.trace("{}: apply, status set to Executed with executeAt: {},
deps: {}", txnId(), executeAt, partialDeps);
+ public boolean has(Status.Outcome outcome)
+ {
+ return known().outcome.compareTo(outcome) >= 0;
+ }
- safeStore.progressLog().executed(this, shard);
+ public boolean is(Status status)
+ {
+ return status() == status;
+ }
- maybeExecute(safeStore, shard, true, true);
- return ApplyOutcome.Success;
+ public final CommandListener asListener()
+ {
+ return listener(txnId());
}
- @Override
- public PreLoadContext listenerPreLoadContext(TxnId caller)
+ public final boolean isWitnessed()
{
- return PreLoadContext.contextFor(listOf(txnId(), caller));
+ checkCanReadFrom();
+ boolean result = status().hasBeen(Status.PreAccepted);
+ Invariants.checkState(result == (this instanceof Preaccepted));
+ return result;
}
- @Override
- public void onChange(SafeCommandStore safeStore, Command command)
+ public final Preaccepted asWitnessed()
{
- logger.trace("{}: updating as listener in response to change on {}
with status {} ({})",
- txnId(), command.txnId(), command.status(), command);
- switch (command.status())
- {
- default:
- throw new IllegalStateException();
- case NotWitnessed:
- case PreAccepted:
- case Accepted:
- case AcceptedInvalidate:
- break;
+ checkCanReadFrom();
+ return (Preaccepted) this;
+ }
- case PreCommitted:
- case Committed:
- case ReadyToExecute:
- case PreApplied:
- case Applied:
- case Invalidated:
- updatePredecessor(command);
- maybeExecute(safeStore, progressShard(safeStore), false, true);
- break;
- }
+ public final boolean isAccepted()
+ {
+ checkCanReadFrom();
+ boolean result = status().hasBeen(Status.AcceptedInvalidate);
+ Invariants.checkState(result == (this instanceof Accepted));
+ return result;
}
- protected void postApply(SafeCommandStore safeStore)
+ public final Accepted asAccepted()
{
- logger.trace("{} applied, setting status to Applied and notifying
listeners", txnId());
- setStatus(Applied);
- notifyListeners(safeStore);
+ checkCanReadFrom();
+ return (Accepted) this;
}
- private static Function<SafeCommandStore, Void> callPostApply(TxnId txnId)
+ public final boolean isCommitted()
{
- return safeStore -> {
- safeStore.command(txnId).postApply(safeStore);
- return null;
- };
+ checkCanReadFrom();
+ boolean result = status().hasBeen(Status.Committed);
+ Invariants.checkState(result == (this instanceof Committed));
+ return result;
}
- protected Future<Void> apply(SafeCommandStore safeStore)
+ public final Committed asCommitted()
{
- // important: we can't include a reference to *this* in the lambda,
since the C* implementation may evict
- // the command instance from memory between now and the write
completing (and post apply being called)
- CommandStore unsafeStore = safeStore.commandStore();
- return writes().apply(safeStore).flatMap(unused ->
- unsafeStore.submit(this, callPostApply(txnId()))
- );
+ checkCanReadFrom();
+ return (Committed) this;
}
- public Future<Data> read(SafeCommandStore safeStore)
+ public final boolean isExecuted()
{
- return partialTxn().read(safeStore, this);
+ checkCanReadFrom();
+ boolean result = status().hasBeen(Status.PreApplied);
+ Invariants.checkState(result == (this instanceof Executed));
+ return result;
}
- // TODO (expected, API consistency): maybe split into maybeExecute and
maybeApply?
- private boolean maybeExecute(SafeCommandStore safeStore, ProgressShard
shard, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
+ public final Executed asExecuted()
{
- if (logger.isTraceEnabled())
- logger.trace("{}: Maybe executing with status {}. Will notify
listeners on noop: {}", txnId(), status(), alwaysNotifyListeners);
+ checkCanReadFrom();
+ return (Executed) this;
+ }
- if (status() != Committed && status() != PreApplied)
+ public static final class NotWitnessed extends AbstractCommand
+ {
+ NotWitnessed(TxnId txnId, SaveStatus status, Status.Durability
durability, RoutingKey homeKey, RoutingKey progressKey, Route<?> route, Ballot
promised, ImmutableSet<CommandListener> listeners)
{
- if (alwaysNotifyListeners)
- notifyListeners(safeStore);
- return false;
+ super(txnId, status, durability, homeKey, progressKey, route,
promised, listeners);
}
- if (isWaitingOnDependency())
+ NotWitnessed(CommonAttributes common, SaveStatus status, Ballot
promised)
{
- if (alwaysNotifyListeners)
- notifyListeners(safeStore);
-
- if (notifyWaitingOn)
- new NotifyWaitingOn(this).accept(safeStore);
- return false;
+ super(common, status, promised);
}
- switch (status())
+ public static NotWitnessed create(TxnId txnId)
{
- case Committed:
- // TODO (desirable, efficiency): maintain distinct ReadyToRead
and ReadyToWrite states
- setStatus(ReadyToExecute);
- logger.trace("{}: set to ReadyToExecute", txnId());
- safeStore.progressLog().readyToExecute(this, shard);
- notifyListeners(safeStore);
- break;
+ return new NotWitnessed(txnId, SaveStatus.NotWitnessed,
NotDurable, null, null, null, Ballot.ZERO, null);
+ }
- case PreApplied:
- Ranges executeRanges = executeRanges(safeStore, executeAt());
- boolean intersects = writes().keys.intersects(executeRanges);
+ private static class Factory
+ {
+ public static NotWitnessed create(CommonAttributes common, Ballot
promised)
+ {
+ return new NotWitnessed(common, SaveStatus.NotWitnessed,
promised);
+ }
- if (intersects)
- {
- logger.trace("{}: applying", txnId());
- apply(safeStore);
- }
- else
- {
- // TODO (desirable, performance): This could be performed
immediately upon Committed
- // but: if we later support transitive dependency
elision this could be dangerous
- logger.trace("{}: applying no-op", txnId());
- setStatus(Applied);
- notifyListeners(safeStore);
- }
+ public static NotWitnessed update(NotWitnessed command,
CommonAttributes common, Ballot promised)
+ {
+ checkSameClass(command, NotWitnessed.class, "Cannot update");
+ command.checkCanReadFrom();
+
Invariants.checkArgument(command.txnId().equals(common.txnId()));
+ return new NotWitnessed(common, command.saveStatus(),
promised);
+ }
}
- return true;
- }
- /**
- * @param dependency is either committed or invalidated
- * @return true iff {@code maybeExecute} might now have a different outcome
- */
- private boolean updatePredecessor(Command dependency)
- {
- Invariants.checkState(dependency.hasBeen(PreCommitted));
- if (dependency.hasBeen(Invalidated))
+ @Override
+ public Timestamp executeAt()
{
- logger.trace("{}: {} is invalidated. Stop listening and removing
from waiting on commit set.", txnId(), dependency.txnId());
- dependency.removeListener(this);
- removeWaitingOnCommit(dependency.txnId());
- return true;
+ checkCanReadFrom();
+ return null;
}
- else if (dependency.executeAt().compareTo(executeAt()) > 0)
+
+ @Override
+ public Ballot promised()
{
- // dependency cannot be a predecessor if it executes later
- logger.trace("{}: {} executes after us. Stop listening and
removing from waiting on apply set.", txnId(), dependency.txnId());
- removeWaitingOn(dependency.txnId(), dependency.executeAt());
- dependency.removeListener(this);
- return true;
+ checkCanReadFrom();
+ return Ballot.ZERO;
}
- else if (dependency.hasBeen(Applied))
+
+ @Override
+ public Ballot accepted()
{
- logger.trace("{}: {} has been applied. Stop listening and removing
from waiting on apply set.", txnId(), dependency.txnId());
- removeWaitingOn(dependency.txnId(), dependency.executeAt());
- dependency.removeListener(this);
- return true;
+ checkCanReadFrom();
+ return Ballot.ZERO;
}
- else if (isWaitingOnDependency())
+
+ @Override
+ public PartialTxn partialTxn()
{
- logger.trace("{}: adding {} to waiting on apply set.", txnId(),
dependency.txnId());
- addWaitingOnApplyIfAbsent(dependency.txnId(),
dependency.executeAt());
- removeWaitingOnCommit(dependency.txnId());
- return false;
+ checkCanReadFrom();
+ return null;
}
- else
+
+ @Override
+ public @Nullable PartialDeps partialDeps()
{
- throw new IllegalStateException();
+ checkCanReadFrom();
+ return null;
}
}
- private void insertPredecessor(Command dependency)
+ public static class Preaccepted extends AbstractCommand
{
- Invariants.checkState(dependency.hasBeen(PreCommitted));
- if (dependency.hasBeen(Invalidated))
+ private final Timestamp executeAt;
+ private final PartialTxn partialTxn;
+ private final @Nullable PartialDeps partialDeps;
+
+ private Preaccepted(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised)
+ {
+ super(common, status, promised);
+ this.executeAt = executeAt;
+ this.partialTxn = common.partialTxn();
+ this.partialDeps = common.partialDeps();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ Preaccepted that = (Preaccepted) o;
+ return executeAt.equals(that.executeAt)
+ && Objects.equals(partialTxn, that.partialTxn)
+ && Objects.equals(partialDeps, that.partialDeps);
+ }
+
+ @Override
+ public int hashCode()
{
- logger.trace("{}: {} is invalidated. Do not insert.", txnId(),
dependency.txnId());
+ int hash = super.hashCode();
+ hash = 31 * hash + Objects.hashCode(executeAt);
+ hash = 31 * hash + Objects.hashCode(partialTxn);
+ hash = 31 * hash + Objects.hashCode(partialDeps);
+ return hash;
}
- else if (dependency.executeAt().compareTo(executeAt()) > 0)
+
+ private static class Factory
{
- // dependency cannot be a predecessor if it executes later
- logger.trace("{}: {} executes after us. Do not insert.", txnId(),
dependency.txnId());
+ public static Preaccepted create(CommonAttributes common,
Timestamp executeAt, Ballot promised)
+ {
+ return new Preaccepted(common, SaveStatus.PreAccepted,
executeAt, promised);
+ }
+
+ public static Preaccepted update(Preaccepted command,
CommonAttributes common, Ballot promised)
+ {
+ checkPromised(command, promised);
+ checkSameClass(command, Preaccepted.class, "Cannot update");
+ Invariants.checkArgument(command.getClass() ==
Preaccepted.class);
+ command.checkCanReadFrom();
+ return create(common, command.executeAt(), promised);
+ }
}
- else if (dependency.hasBeen(Applied))
+
+ @Override
+ public Timestamp executeAt()
{
- logger.trace("{}: {} has been applied. Do not insert.", txnId(),
dependency.txnId());
+ checkCanReadFrom();
+ return executeAt;
}
- else
+
+ @Override
+ public Ballot accepted()
{
- logger.trace("{}: adding {} to waiting on apply set.", txnId(),
dependency.txnId());
- addWaitingOnApplyIfAbsent(dependency.txnId(),
dependency.executeAt());
+ checkCanReadFrom();
+ return Ballot.ZERO;
}
- }
- void updatePredecessorAndMaybeExecute(SafeCommandStore safeStore, Command
predecessor, boolean notifyWaitingOn)
- {
- if (hasBeen(Applied))
- return;
+ @Override
+ public PartialTxn partialTxn()
+ {
+ checkCanReadFrom();
+ return partialTxn;
+ }
- if (updatePredecessor(predecessor))
- maybeExecute(safeStore, progressShard(safeStore), false,
notifyWaitingOn);
+ @Override
+ public @Nullable PartialDeps partialDeps()
+ {
+ checkCanReadFrom();
+ return partialDeps;
+ }
}
- static class NotifyWaitingOn implements PreLoadContext,
Consumer<SafeCommandStore>
+ public static class Accepted extends Preaccepted
{
- Known[] blockedUntil = new Known[4];
- TxnId[] txnIds = new TxnId[4];
- int depth;
+ private final Ballot accepted;
- public NotifyWaitingOn(Command command)
+ private Accepted(CommonAttributes common, SaveStatus status, Timestamp
executeAt, Ballot promised, Ballot accepted)
+ {
+ super(common, status, executeAt, promised);
+ this.accepted = accepted;
+ }
+
+ @Override
+ public boolean equals(Object o)
{
- txnIds[0] = command.txnId();
- blockedUntil[0] = Done;
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ Accepted that = (Accepted) o;
+ return Objects.equals(accepted, that.accepted);
}
@Override
- public void accept(SafeCommandStore safeStore)
+ public int hashCode()
{
- Command prev = get(safeStore, depth - 1);
- while (depth >= 0)
+ int hash = super.hashCode();
+ hash = 31 * hash + Objects.hashCode(accepted);
+ return hash;
+ }
+
+ private static class Factory
+ {
+ static Accepted create(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted)
{
- Command cur = safeStore.ifLoaded(txnIds[depth]);
- Known until = blockedUntil[depth];
- if (cur == null)
- {
- // need to load; schedule execution for later
- safeStore.execute(this, this);
- return;
- }
+ return new Accepted(common, status, executeAt, promised,
accepted);
+ }
- if (prev != null)
- {
- if (cur.has(until) || (cur.hasBeen(PreCommitted) &&
cur.executeAt().compareTo(prev.executeAt()) > 0))
- {
- prev.updatePredecessorAndMaybeExecute(safeStore, cur,
false);
- --depth;
- prev = get(safeStore, depth - 1);
- continue;
- }
- }
- else if (cur.has(until))
- {
- // we're done; have already applied
- Invariants.checkState(depth == 0);
- break;
- }
+ static Accepted update(Accepted command, CommonAttributes common,
SaveStatus status, Ballot promised)
+ {
+ checkPromised(command, promised);
+ checkSameClass(command, Accepted.class, "Cannot update");
+ command.checkCanUpdate();
+ return new Accepted(common, status, command.executeAt(),
promised, command.accepted());
+ }
- TxnId directlyBlockedOnCommit = cur.firstWaitingOnCommit();
- TxnId directlyBlockedOnApply =
cur.firstWaitingOnApply(directlyBlockedOnCommit);
- if (directlyBlockedOnApply != null)
- {
- push(directlyBlockedOnApply, Done);
- }
- else if (directlyBlockedOnCommit != null)
- {
- push(directlyBlockedOnCommit, ExecuteAtOnly);
- }
- else
- {
- if (cur.hasBeen(Committed) && !cur.hasBeen(ReadyToExecute)
&& !cur.isWaitingOnDependency())
- {
- if (!cur.maybeExecute(safeStore,
cur.progressShard(safeStore), false, false))
- throw new AssertionError("Is able to Apply, but
has not done so");
- // loop and re-test the command's status; we may still
want to notify blocking, esp. if not homeShard
- continue;
- }
-
- Unseekables<?, ?> someKeys = cur.maxUnseekables();
- if (someKeys == null && prev != null) someKeys =
prev.partialDeps().someUnseekables(cur.txnId());
- Invariants.checkState(someKeys != null);
- logger.trace("{} blocked on {} until {}", txnIds[0],
cur.txnId(), until);
- safeStore.progressLog().waiting(cur.txnId(), until,
someKeys);
- return;
- }
- prev = cur;
+ static Accepted update(Accepted command, CommonAttributes common,
Ballot promised)
+ {
+ return update(command, common, command.saveStatus(), promised);
}
}
- private Command get(SafeCommandStore safeStore, int i)
+ @Override
+ public Ballot accepted()
{
- return i >= 0 ? safeStore.command(txnIds[i]) : null;
+ checkCanReadFrom();
+ return accepted;
}
+ }
- void push(TxnId by, Known until)
+ public static class Committed extends Accepted
+ {
+ private final ImmutableSortedSet<TxnId> waitingOnCommit;
+ private final ImmutableSortedMap<Timestamp, TxnId> waitingOnApply;
+
+ private Committed(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted,
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId>
waitingOnApply)
{
- if (++depth == txnIds.length)
- {
- txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
- blockedUntil = Arrays.copyOf(blockedUntil, txnIds.length);
- }
- txnIds[depth] = by;
- blockedUntil[depth] = until;
+ super(common, status, executeAt, promised, accepted);
+ this.waitingOnCommit = waitingOnCommit;
+ this.waitingOnApply = waitingOnApply;
+ }
+
+ private Committed(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn)
+ {
+ this(common, status, executeAt, promised, accepted,
waitingOn.waitingOnCommit, waitingOn.waitingOnApply);
}
@Override
- public Iterable<TxnId> txnIds()
+ public boolean equals(Object o)
{
- return Arrays.asList(txnIds).subList(0, depth + 1);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ Committed committed = (Committed) o;
+ return Objects.equals(waitingOnCommit, committed.waitingOnCommit)
+ && Objects.equals(waitingOnApply,
committed.waitingOnApply);
}
@Override
- public Seekables<?, ?> keys()
+ public int hashCode()
{
- return Keys.EMPTY;
+ int hash = super.hashCode();
+ hash = 31 * hash + Objects.hashCode(waitingOnCommit);
+ hash = 31 * hash + Objects.hashCode(waitingOnApply);
+ return hash;
}
- }
- /**
- * A key nominated to represent the "home" shard - only members of the
home shard may be nominated to recover
- * a transaction, to reduce the cluster-wide overhead of ensuring
progress. A transaction that has only been
- * witnessed at PreAccept may however trigger a process of ensuring the
home shard is durably informed of
- * the transaction.
- *
- * Note that for ProgressLog purposes the "home shard" is the shard as of
txnId.epoch.
- * For recovery purposes the "home shard" is as of txnId.epoch until
Committed, and executeAt.epoch once Executed
- */
- public final void homeKey(RoutingKey homeKey)
- {
- RoutingKey current = homeKey();
- if (current == null) setHomeKey(homeKey);
- else if (!current.equals(homeKey)) throw new AssertionError();
- }
+ private static class Factory
Review Comment:
I'd added these because I was having problems with ambiguous create methods,
ie calling Accepted.create and it calling Preaccepted.create instead. Renaming
to createPreAccepted or preAccepted would be better though
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]