aweisberg commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1011958467
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -19,362 +19,487 @@
package accord.local;
import accord.api.*;
-import accord.local.Node.Id;
+import accord.local.Status.Durability;
+import accord.local.Status.ExecutionPhase;
+import accord.local.Status.ReplicationPhase;
import accord.primitives.*;
-import accord.txn.Txn;
-import accord.txn.Writes;
+import accord.primitives.Writes;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
-import static accord.local.Status.*;
+import static accord.local.Status.ExecutionPhase.Decided;
+import static accord.local.Status.ExecutionPhase.Done;
import static accord.utils.Utils.listOf;
-public abstract class Command implements Listener, Consumer<Listener>,
PreLoadContext, PartialCommand.WithDeps
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.api.ProgressLog.ProgressShard;
+import accord.primitives.AbstractRoute;
+import accord.primitives.KeyRanges;
+import accord.primitives.Ballot;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Route;
+import accord.primitives.RoutingKeys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+import static accord.api.ProgressLog.ProgressShard.Home;
+import static accord.api.ProgressLog.ProgressShard.Local;
+import static accord.api.ProgressLog.ProgressShard.No;
+import static accord.api.ProgressLog.ProgressShard.Unsure;
+import static accord.local.Command.EnsureAction.Add;
+import static accord.local.Command.EnsureAction.Check;
+import static accord.local.Command.EnsureAction.Ignore;
+import static accord.local.Command.EnsureAction.Set;
+import static accord.local.Command.EnsureAction.TrySet;
+import static accord.local.Status.Accepted;
+import static accord.local.Status.AcceptedInvalidate;
+import static accord.local.Status.Applied;
+import static accord.local.Status.Committed;
+import static accord.local.Status.PreApplied;
+import static accord.local.Status.Invalidated;
+import static accord.local.Status.NotWitnessed;
+import static accord.local.Status.PreAccepted;
+import static accord.local.Status.ReadyToExecute;
+
+public abstract class Command implements Listener,
BiConsumer<SafeCommandStore, Listener>, PreLoadContext
{
private static final Logger logger =
LoggerFactory.getLogger(Command.class);
- public abstract CommandStore commandStore();
+ public abstract TxnId txnId();
- public abstract Key homeKey();
+ // TODO (now): pack this into TxnId
+ public abstract Txn.Kind kind();
- protected abstract void setHomeKey(Key key);
-
- public abstract Key progressKey();
+ public boolean hasBeen(Status status)
+ {
+ return status().hasBeen(status);
+ }
- protected abstract void setProgressKey(Key key);
+ public boolean hasBeen(ReplicationPhase phase)
+ {
+ return status().replicates.compareTo(phase) >= 0;
+ }
- protected abstract void setTxn(Txn txn);
+ public boolean hasBeen(ExecutionPhase phase)
+ {
+ return status().execution.compareTo(phase) >= 0;
+ }
- public abstract Ballot promised();
+ public boolean is(Status status)
+ {
+ return status() == status;
+ }
- public abstract void promised(Ballot ballot);
+ /**
+ * homeKey is a global value that defines the home shard - the one tasked
with ensuring the transaction is finished.
+ * progressKey is a local value that defines the local shard responsible
for ensuring progress on the transaction.
+ * This will be homeKey if it is owned by the node, and some other key
otherwise. If not the home shard, the progress
+ * shard has much weaker responsibilities, only ensuring that the home
shard has durably witnessed the txnId.
+ */
+ public abstract RoutingKey homeKey();
+ protected abstract void setHomeKey(RoutingKey key);
- public abstract Ballot accepted();
+ public abstract RoutingKey progressKey();
+ protected abstract void setProgressKey(RoutingKey key);
- public abstract void accepted(Ballot ballot);
+ /**
+ * If this is the home shard, we require that this is a Route for all
states > NotWitnessed;
+ * otherwise for the local progress shard this is ordinarily a
PartialRoute, and for other shards this is not set,
+ * so that there is only one copy per node that can be consulted to
construct the full set of involved keys.
+ *
+ * If hasBeen(Committed) this must contain the keys for both txnId.epoch
and executeAt.epoch
+ *
+ * TODO: maybe set this for all local shards, but slice to only those
participating keys
+ * (would probably need to remove hashIntersects)
+ */
+ public abstract AbstractRoute route();
+ protected abstract void setRoute(AbstractRoute route);
- public abstract void executeAt(Timestamp timestamp);
+ public abstract PartialTxn partialTxn();
+ protected abstract void setPartialTxn(PartialTxn txn);
- public abstract Deps savedDeps();
+ public abstract Ballot promised();
+ protected abstract void setPromised(Ballot ballot);
- public abstract void savedDeps(Deps deps);
+ public abstract Ballot accepted();
+ protected abstract void setAccepted(Ballot ballot);
- public abstract Writes writes();
+ public void saveRoute(SafeCommandStore safeStore, Route route)
+ {
+ setRoute(route);
+ updateHomeKey(safeStore, route.homeKey);
+ }
- public abstract void writes(Writes writes);
+ public abstract Timestamp executeAt();
+ protected abstract void setExecuteAt(Timestamp timestamp);
- public abstract Result result();
+ /**
+ * While !hasBeen(Committed), used only as a register for Accept state,
used by Recovery
+ * If hasBeen(Committed), represents the full deps owned by this range for
execution at both txnId.epoch
+ * AND executeAt.epoch so that it may be used for Recovery (which contacts
only txnId.epoch topology),
+ * but also for execution.
+ */
+ public abstract PartialDeps partialDeps();
+ protected abstract void setPartialDeps(PartialDeps deps);
- public abstract void result(Result result);
+ public abstract Writes writes();
+ protected abstract void setWrites(Writes writes);
- public abstract void status(Status status);
+ public abstract Result result();
+ protected abstract void setResult(Result result);
- public abstract boolean isGloballyPersistent();
+ public abstract Status status();
+ protected abstract void setStatus(Status status);
- public abstract void isGloballyPersistent(boolean v);
+ public abstract Durability durability();
+ public abstract void setDurability(Durability v);
public abstract Command addListener(Listener listener);
+ public abstract void removeListener(Listener listener);
+ protected abstract void notifyListeners(SafeCommandStore safeStore);
- public abstract void notifyListeners();
-
- public abstract void addWaitingOnCommit(Command command);
-
- public abstract boolean isWaitingOnCommit();
-
- public abstract void removeWaitingOnCommit(PartialCommand command);
-
- public abstract PartialCommand firstWaitingOnCommit();
-
- public abstract void addWaitingOnApplyIfAbsent(PartialCommand command);
-
- public abstract boolean isWaitingOnApply();
+ protected abstract void addWaitingOnCommit(TxnId txnId);
+ protected abstract boolean isWaitingOnCommit();
+ protected abstract void removeWaitingOnCommit(TxnId txnId);
+ protected abstract TxnId firstWaitingOnCommit();
- public abstract void removeWaitingOnApply(PartialCommand command);
+ protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp
executeAt);
+ protected abstract boolean isWaitingOnApply();
+ protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
+ protected abstract TxnId firstWaitingOnApply();
- public abstract PartialCommand firstWaitingOnApply();
+ public boolean hasBeenWitnessed()
+ {
+ return partialTxn() != null;
+ }
- public boolean isUnableToApply()
+ public boolean isUnableToExecute()
{
return isWaitingOnCommit() || isWaitingOnApply();
}
@Override
public Iterable<TxnId> txnIds()
{
- return Iterables.concat(Collections.singleton(txnId()),
savedDeps().txnIds());
+ return Collections.singleton(txnId());
}
@Override
public Iterable<Key> keys()
{
- return txn().keys();
+ // TODO (now): when do we need this, and will it always be sufficient?
+ return partialTxn().keys();
}
- public void setGloballyPersistent(Key homeKey, Timestamp executeAt)
+ public void setDurability(SafeCommandStore safeStore, Durability
durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
{
- homeKey(homeKey);
- if (!hasBeen(Committed))
- this.executeAt(executeAt);
- else if (!this.executeAt().equals(executeAt))
- commandStore().agent().onInconsistentTimestamp(this,
this.executeAt(), executeAt);
- isGloballyPersistent(true);
+ updateHomeKey(safeStore, homeKey);
+ if (executeAt != null && hasBeen(Committed) &&
!this.executeAt().equals(executeAt))
+ safeStore.agent().onInconsistentTimestamp(this, this.executeAt(),
executeAt);
+ setDurability(durability);
}
- // requires that command != null
- // relies on mutual exclusion for each key
- // note: we do not set status = newStatus, we only use it to decide how we
register with the retryLog
- private void witness(Txn txn, Key homeKey, Key progressKey)
+ public enum AcceptOutcome
{
- txn(txn);
- homeKey(homeKey);
- progressKey(progressKey);
-
- if (status() == NotWitnessed)
- status(PreAccepted);
+ Success, Redundant, RejectedBallot
+ }
- if (executeAt() == null)
- {
- Timestamp max = commandStore().maxConflict(txn.keys());
- // unlike in the Accord paper, we partition shards within a node,
so that to ensure a total order we must either:
- // - use a global logical clock to issue new timestamps; or
- // - assign each shard _and_ process a unique id, and use both as
components of the timestamp
- Timestamp witnessed = txnId().compareTo(max) > 0 && txnId().epoch
>= commandStore().latestEpoch()
- ? txnId() : commandStore().uniqueNow(max);
- executeAt(witnessed);
+ public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn
partialTxn, AbstractRoute route, @Nullable RoutingKey progressKey)
+ {
+ if (promised().compareTo(Ballot.ZERO) > 0)
+ return AcceptOutcome.RejectedBallot;
- txn.keys().foldl(commandStore().ranges().since(txnId().epoch), (i,
key, param) -> {
- if (commandStore().hashIntersects(key))
- commandStore().commandsForKey(key).register(this);
- return null;
- }, null);
- }
+ return preacceptInternal(safeStore, partialTxn, route, progressKey);
}
- public boolean preaccept(Txn txn, Key homeKey, Key progressKey)
+ private AcceptOutcome preacceptInternal(SafeCommandStore safeStore,
PartialTxn partialTxn, AbstractRoute route, @Nullable RoutingKey progressKey)
{
- if (promised().compareTo(Ballot.ZERO) > 0)
+ if (executeAt() != null)
{
- logger.trace("{}: skipping preaccept - witnessed higher ballot
({})", txnId(), promised());
- return false;
+ logger.trace("{}: skipping preaccept - already preaccepted ({})",
txnId(), status());
+ return AcceptOutcome.Redundant;
}
- if (hasBeen(PreAccepted))
+ Status status = status();
+ switch (status)
{
- logger.trace("{}: skipping preaccept - already preaccepted ({})",
txnId(), status());
- return true;
+ default: throw new IllegalStateException();
+ case NotWitnessed:
+ case AcceptedInvalidate:
+ case Invalidated:
}
- witness(txn, homeKey, progressKey);
- boolean isProgressShard = progressKey != null &&
handles(txnId().epoch, progressKey);
- commandStore().progressLog().preaccept(this, isProgressShard,
isProgressShard && progressKey.equals(homeKey));
+ KeyRanges coordinateRanges = coordinateRanges(safeStore);
+ ProgressShard shard = progressShard(safeStore, route, progressKey,
coordinateRanges);
+ if (!validate(KeyRanges.EMPTY, coordinateRanges, shard, route, Set,
partialTxn, Set, null, Ignore))
+ throw new IllegalStateException();
- logger.trace("{}: preaccepted with executeAt: {}, deps: {}", txnId(),
executeAt(), savedDeps());
- notifyListeners();
- return true;
+ Timestamp max = safeStore.maxConflict(partialTxn.keys());
Review Comment:
Totally fine not to change it. I don't think I was confused by it you had
just volunteered to change it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]