belliottsmith commented on code in PR #33:
URL: https://github.com/apache/cassandra-accord/pull/33#discussion_r1100964472


##########
accord-core/src/main/java/accord/local/AsyncCommandStores.java:
##########
@@ -1,92 +0,0 @@
-/*

Review Comment:
   Yeah, the sync stores avoid any allocations or additional work in the normal 
case. Having both sync and async maintained ensures our API decisions are 
compatible with both approaches. Removing one is likely to lead to a situation 
where unpicking some dependency is much harder in future. Since the sync 
approach is potentially more efficient long term, I put quite a bit of work 
into ensuring both paradigms could be supported equally efficiently, so that we 
can more easily migrate in future.
   
   This is the second time this work has been erased, so it would be nice to 
try and avoid that in future if possible. Perhaps we can add some commentary to 
this effect.
   
   > Since accord is meant to be async
   
   Not sure what you mean by that? Most of the `CommandStore` processing is 
synchronous, we currently have a thread per `CommandStore` but its work serving 
these requests is synchronous. I am far from certain that this will continue to 
be true indefinitely, so we shouldn't assume it I don't think. For data that is 
in cache, using the thread that happens to be running will likely be more 
efficient. Also, the separation of loading/writing concerns and thread in 
general in Cassandra is very likely to evolve. Virtual threads is a mid-term 
horizon that may very fundamentally alter what is optimal too.
   



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,1178 +18,1435 @@
 
 package accord.local;
 
-import accord.api.*;
-import accord.local.Status.Durability;
-import accord.local.Status.Known;
+import accord.api.Data;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
 import accord.primitives.*;
-import accord.primitives.Writes;
 import accord.utils.Invariants;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import static accord.local.Status.*;
-import static accord.local.Status.Known.*;
-import static accord.local.Status.Known.Done;
-import static accord.local.Status.Known.ExecuteAtOnly;
-import static accord.primitives.Route.isFullRoute;
-import static accord.utils.Utils.listOf;
-
-import javax.annotation.Nonnull;
+import accord.utils.Utils;
+import accord.utils.async.AsyncChain;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+
 import javax.annotation.Nullable;
+import java.util.*;
 
-import accord.api.ProgressLog.ProgressShard;
-import accord.primitives.Ranges;
-import accord.primitives.Ballot;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.Route;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.api.Result;
-import accord.api.RoutingKey;
+import static accord.local.Status.Durability.Local;
+import static accord.local.Status.Durability.NotDurable;
+import static accord.local.Status.Known.DefinitionOnly;
+import static accord.utils.Utils.*;
+import static java.lang.String.format;
 
-import static accord.api.ProgressLog.ProgressShard.Home;
-import static accord.api.ProgressLog.ProgressShard.Local;
-import static accord.api.ProgressLog.ProgressShard.No;
-import static accord.api.ProgressLog.ProgressShard.Unsure;
-import static accord.local.Command.EnsureAction.Add;
-import static accord.local.Command.EnsureAction.Check;
-import static accord.local.Command.EnsureAction.Ignore;
-import static accord.local.Command.EnsureAction.Set;
-import static accord.local.Command.EnsureAction.TrySet;
-
-public abstract class Command implements CommandListener, 
BiConsumer<SafeCommandStore, CommandListener>, PreLoadContext
+public abstract class Command extends ImmutableState
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(Command.class);
+    // sentinel value to indicate a command requested in a preexecute context 
was not found
+    // should not escape the safe command store
+    public static final Command EMPTY = new Command()
+    {
+        @Override public Route<?> route() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public RoutingKey progressKey() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public RoutingKey homeKey() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public TxnId txnId() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Ballot promised() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Status.Durability durability() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public ImmutableSet<CommandListener> listeners() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public SaveStatus saveStatus() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Timestamp executeAt() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Ballot accepted() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public PartialTxn partialTxn() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Nullable
+        @Override public PartialDeps partialDeps() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
 
-    public abstract TxnId txnId();
+        @Override
+        public String toString()
+        {
+            return "Command(EMPTY)";
+        }
+    };
 
-    // TODO (desirable, API consistency): should any of these calls be 
replaced by corresponding known() registers?
-    public boolean hasBeen(Status status)
+    static
     {
-        return status().hasBeen(status);
+        EMPTY.markInvalidated();
     }
 
-    public boolean has(Known known)
+    static PreLoadContext contextForCommand(Command command)
     {
-        return known.isSatisfiedBy(saveStatus().known);
+        Invariants.checkState(command.hasBeen(Status.PreAccepted) && 
command.partialTxn() != null);
+        return command instanceof PreLoadContext ? (PreLoadContext) command : 
PreLoadContext.contextFor(command.txnId(), command.partialTxn().keys());
     }
 
-    public boolean has(Definition definition)
+    private static Status.Durability durability(Status.Durability durability, 
SaveStatus status)
     {
-        return known().definition.compareTo(definition) >= 0;
+        if (status.compareTo(SaveStatus.PreApplied) >= 0 && durability == 
NotDurable)
+            return Local; // not necessary anywhere, but helps for logical 
consistency
+        return durability;
     }
 
-    public boolean has(Outcome outcome)
+    public interface CommonAttributes
     {
-        return known().outcome.compareTo(outcome) >= 0;
+        TxnId txnId();
+        Status.Durability durability();
+        RoutingKey homeKey();
+        RoutingKey progressKey();
+        Route<?> route();
+        PartialTxn partialTxn();
+        PartialDeps partialDeps();
+        ImmutableSet<CommandListener> listeners();
     }
 
-    public boolean is(Status status)
+    public static class SerializerSupport
     {
-        return status() == status;
-    }
-
-    /**
-     * homeKey is a global value that defines the home shard - the one tasked 
with ensuring the transaction is finished.
-     * progressKey is a local value that defines the local shard responsible 
for ensuring progress on the transaction.
-     * This will be homeKey if it is owned by the node, and some other key 
otherwise. If not the home shard, the progress
-     * shard has much weaker responsibilities, only ensuring that the home 
shard has durably witnessed the txnId.
-     *
-     * TODO (expected, efficiency): we probably do not want to save this on 
its own, as we probably want to
-     *  minimize IO interactions and discrete registers, so will likely 
reference commit log entries directly
-     *  At which point we may impose a requirement that only a Route can be 
saved, not a homeKey on its own.
-     *  Once this restriction is imposed, we no longer need to pass around 
Routable.Domain with TxnId.
-     */
-    public abstract RoutingKey homeKey();
-    protected abstract void setHomeKey(RoutingKey key);
-
-    public abstract RoutingKey progressKey();
-    protected abstract void setProgressKey(RoutingKey key);
-
-    /**
-     * If this is the home shard, we require that this is a Route for all 
states &gt; NotWitnessed;
-     * otherwise for the local progress shard this is ordinarily a 
PartialRoute, and for other shards this is not set,
-     * so that there is only one copy per node that can be consulted to 
construct the full set of involved keys.
-     *
-     * If hasBeen(Committed) this must contain the keys for both txnId.epoch 
and executeAt.epoch
-     */
-    public abstract @Nullable Route<?> route();
-    protected abstract void setRoute(Route<?> route);
-
-    public abstract PartialTxn partialTxn();
-    protected abstract void setPartialTxn(PartialTxn txn);
-
-    public abstract Ballot promised();
-    protected abstract void setPromised(Ballot ballot);
-
-    public abstract Ballot accepted();
-    protected abstract void setAccepted(Ballot ballot);
-
-    public abstract Timestamp executeAt();
-    protected abstract void setExecuteAt(Timestamp timestamp);
-
-    /**
-     * While !hasBeen(Committed), used only as a register for Accept state, 
used by Recovery
-     * If hasBeen(Committed), represents the full deps owned by this range for 
execution at both txnId.epoch
-     * AND executeAt.epoch so that it may be used for Recovery (which contacts 
only txnId.epoch topology),
-     * but also for execution.
-     */
-    public abstract PartialDeps partialDeps();
-    protected abstract void setPartialDeps(PartialDeps deps);
-
-    public abstract Writes writes();
-    protected abstract void setWrites(Writes writes);
-
-    public abstract Result result();
-    protected abstract void setResult(Result result);
-
-    public abstract SaveStatus saveStatus();
-    protected abstract void setSaveStatus(SaveStatus status);
-
-    public Status status() { return saveStatus().status; }
-    protected void setStatus(Status status) { 
setSaveStatus(SaveStatus.get(status, known())); }
-
-    public Known known() { return saveStatus().known; }
-
-    public abstract Durability durability();
-    public abstract void setDurability(Durability v);
+        public static Command.Listener listener(TxnId txnId)
+        {
+            return new Command.Listener(txnId);
+        }
 
-    public abstract Command addListener(CommandListener listener);
-    public abstract void removeListener(CommandListener listener);
-    protected abstract void notifyListeners(SafeCommandStore safeStore);
+        public static NotWitnessed notWitnessed(CommonAttributes attributes, 
Ballot promised)
+        {
+            return NotWitnessed.Factory.create(attributes, promised);
+        }
 
-    protected abstract void addWaitingOnCommit(TxnId txnId);
-    protected abstract void removeWaitingOnCommit(TxnId txnId);
-    protected abstract TxnId firstWaitingOnCommit();
+        public static Preaccepted preaccepted(CommonAttributes common, 
Timestamp executeAt, Ballot promised)
+        {
+            return Preaccepted.Factory.create(common, executeAt, promised);
+        }
 
-    protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp 
executeAt);
-    protected abstract TxnId firstWaitingOnApply(@Nullable TxnId 
ifExecutesBefore);
+        public static Accepted accepted(CommonAttributes common, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted)
+        {
+            return Accepted.Factory.create(common, status, executeAt, 
promised, accepted);
+        }
 
-    protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
-    protected abstract boolean isWaitingOnDependency();
+        public static Committed committed(CommonAttributes common, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted, 
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> 
waitingOnApply)
+        {
+            return Committed.Factory.create(common, status, executeAt, 
promised, accepted, waitingOnCommit, waitingOnApply);
+        }
 
-    public boolean hasBeenWitnessed()
-    {
-        return partialTxn() != null;
+        public static Executed executed(CommonAttributes common, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted, 
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> 
waitingOnApply, Writes writes, Result result)
+        {
+            return Executed.Factory.create(common, status, executeAt, 
promised, accepted, waitingOnCommit, waitingOnApply, writes, result);
+        }
     }
 
-    @Override
-    public Iterable<TxnId> txnIds()
+    private static SaveStatus validateCommandClass(SaveStatus status, Class<?> 
expected, Class<?> actual)
     {
-        return Collections.singleton(txnId());
+        if (actual != expected)
+        {
+            throw new IllegalStateException(format("Cannot instantiate %s for 
status %s. %s expected",
+                                                   actual.getSimpleName(), 
status, expected.getSimpleName()));
+        }
+        return status;
     }
 
-    @Override
-    public Seekables<?, ?> keys()
+    private static SaveStatus validateCommandClass(SaveStatus status, Class<?> 
klass)
     {
-        // TODO (expected, consider): when do we need this, and will it always 
be sufficient?
-        return partialTxn().keys();
+        switch (status)
+        {
+            case NotWitnessed:
+                return validateCommandClass(status, NotWitnessed.class, klass);
+            case PreAccepted:
+                return validateCommandClass(status, Preaccepted.class, klass);
+            case AcceptedInvalidate:
+            case AcceptedInvalidateWithDefinition:
+            case Accepted:
+            case AcceptedWithDefinition:
+                return validateCommandClass(status, Accepted.class, klass);
+            case Committed:
+            case ReadyToExecute:
+                return validateCommandClass(status, Committed.class, klass);
+            case PreApplied:
+            case Applied:
+            case Invalidated:
+                return validateCommandClass(status, Executed.class, klass);
+            default:
+                throw new IllegalStateException("Unhandled status " + status);
+        }
     }
 
-    public void setDurability(SafeCommandStore safeStore, Durability 
durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
+    public static Command addListener(SafeCommandStore safeStore, Command 
command, CommandListener listener)
     {
-        updateHomeKey(safeStore, homeKey);
-        if (executeAt != null && hasBeen(PreCommitted) && 
!this.executeAt().equals(executeAt))
-            safeStore.agent().onInconsistentTimestamp(this, this.executeAt(), 
executeAt);
-        setDurability(durability);
+        return 
safeStore.beginUpdate(command).addListener(listener).updateAttributes();
     }
 
-    public enum AcceptOutcome
+    public static Command removeListener(SafeCommandStore safeStore, Command 
command, CommandListener listener)
     {
-        Success, Redundant, RejectedBallot
+        return 
safeStore.beginUpdate(command).removeListener(listener).updateAttributes();
     }
 
-    public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn 
partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
+    public static Committed updateWaitingOn(SafeCommandStore safeStore, 
Committed command, WaitingOn.Update waitingOn)
     {
-        return preacceptOrRecover(safeStore, partialTxn, route, progressKey, 
Ballot.ZERO);
+        if (!waitingOn.hasChanges())
+            return command;
+
+        Update update = safeStore.beginUpdate(command);
+        Committed updated =  command instanceof Executed ?
+                Executed.Factory.update(command.asExecuted(), update, 
waitingOn.build()) :
+                Committed.Factory.update(command, update, waitingOn.build());
+        return update.complete(updated);
     }
 
-    public AcceptOutcome recover(SafeCommandStore safeStore, PartialTxn 
partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+    public static class Listener implements CommandListener
     {
-        return preacceptOrRecover(safeStore, partialTxn, route, progressKey, 
ballot);
-    }
+        protected final TxnId listenerId;
 
-    private AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, 
PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot 
ballot)
-    {
-        int compareBallots = promised().compareTo(ballot);
-        if (compareBallots > 0)
+        private Listener(TxnId listenerId)
         {
-            logger.trace("{}: skipping preaccept - higher ballot witnessed 
({})", txnId(), promised());
-            return AcceptOutcome.RejectedBallot;
+            this.listenerId = listenerId;
         }
-        else if (compareBallots < 0)
+
+        @Override
+        public boolean equals(Object o)
         {
-            // save the new ballot as a promise
-            setPromised(ballot);
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Listener that = (Listener) o;
+            return listenerId.equals(that.listenerId);
         }
 
-        if (known().definition.isKnown())
+        @Override
+        public int hashCode()
         {
-            Invariants.checkState(status() == Invalidated || executeAt() != 
null);
-            logger.trace("{}: skipping preaccept - already known ({})", 
txnId(), status());
-            // in case of Ballot.ZERO, we must either have a competing 
recovery coordinator or have late delivery of the
-            // preaccept; in the former case we should abandon coordination, 
and in the latter we have already completed
-            return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : 
AcceptOutcome.Success;
+            return Objects.hash(listenerId);
         }
 
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        Invariants.checkState(!coordinateRanges.isEmpty());
-        ProgressShard shard = progressShard(safeStore, route, progressKey, 
coordinateRanges);
-        if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set, 
partialTxn, Set, null, Ignore))
-            throw new IllegalStateException();
-
-        if (executeAt() == null)
+        @Override
+        public String toString()
         {
-            TxnId txnId = txnId();
-            // unlike in the Accord paper, we partition shards within a node, 
so that to ensure a total order we must either:
-            //  - use a global logical clock to issue new timestamps; or
-            //  - assign each shard _and_ process a unique id, and use both as 
components of the timestamp
-            // if we are performing recovery (i.e. non-zero ballot), do not 
permit a fast path decision as we want to
-            // invalidate any transactions that were not completed by their 
initial coordinator
-            if (ballot.equals(Ballot.ZERO)) 
setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
-            else setExecuteAt(safeStore.time().uniqueNow(txnId));
+            return "ListenerProxy{" + listenerId + '}';
+        }
 
-            if (status() == NotWitnessed)
-                setStatus(PreAccepted);
-            safeStore.progressLog().preaccepted(this, shard);
+        public TxnId txnId()
+        {
+            return listenerId;
         }
-        else
+
+        @Override
+        public void onChange(SafeCommandStore safeStore, TxnId txnId)
         {
-            // TODO (expected, ?): in the case that we are pre-committed but 
had not been preaccepted/accepted, should we inform progressLog?
-            setSaveStatus(SaveStatus.enrich(saveStatus(), DefinitionOnly));
+            Commands.listenerUpdate(safeStore, safeStore.command(listenerId), 
safeStore.command(txnId));
         }
-        set(safeStore, Ranges.EMPTY, coordinateRanges, shard, route, 
partialTxn, Set, null, Ignore);
 
-        notifyListeners(safeStore);
-        return AcceptOutcome.Success;
+        @Override
+        public PreLoadContext listenerPreLoadContext(TxnId caller)
+        {
+            return PreLoadContext.contextFor(Utils.listOf(listenerId, caller), 
Keys.EMPTY);
+        }
     }
 
-    public boolean preacceptInvalidate(Ballot ballot)
+    public static CommandListener listener(TxnId txnId)
     {
-        if (promised().compareTo(ballot) > 0)
-        {
-            logger.trace("{}: skipping preacceptInvalidate - witnessed higher 
ballot ({})", txnId(), promised());
-            return false;
-        }
-        setPromised(ballot);
-        return true;
+        return new Listener(txnId);
     }
 
-    public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, 
PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, 
Timestamp executeAt, PartialDeps partialDeps)
+    private abstract static class AbstractCommand extends Command
     {
-        if (this.promised().compareTo(ballot) > 0)
+        private final TxnId txnId;
+        private final SaveStatus status;
+        private final Status.Durability durability;
+        private final RoutingKey homeKey;
+        private final RoutingKey progressKey;
+        private final Route<?> route;
+        private final Ballot promised;
+        private final ImmutableSet<CommandListener> listeners;
+
+        private AbstractCommand(TxnId txnId, SaveStatus status, 
Status.Durability durability, RoutingKey homeKey, RoutingKey progressKey, 
Route<?> route, Ballot promised, ImmutableSet<CommandListener> listeners)
         {
-            logger.trace("{}: skipping accept - witnessed higher ballot ({} > 
{})", txnId(), promised(), ballot);
-            return AcceptOutcome.RejectedBallot;
+            this.txnId = txnId;
+            this.status = validateCommandClass(status, getClass());
+            this.durability = durability;
+            this.homeKey = homeKey;
+            this.progressKey = progressKey;
+            this.route = route;
+            this.promised = promised;
+            this.listeners = listeners;
         }
 
-        if (hasBeen(PreCommitted))
+        private AbstractCommand(CommonAttributes common, SaveStatus status, 
Ballot promised)
         {
-            logger.trace("{}: skipping accept - already committed ({})", 
txnId(), status());
-            return AcceptOutcome.Redundant;
+            this.txnId = common.txnId();
+            this.status = validateCommandClass(status, getClass());
+            this.durability = common.durability();
+            this.homeKey = common.homeKey();
+            this.progressKey = common.progressKey();
+            this.route = common.route();
+            this.promised = promised;
+            this.listeners = common.listeners();
         }
 
-        TxnId txnId = txnId();
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? 
coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
-        Invariants.checkState(!acceptRanges.isEmpty());
-        ProgressShard shard = progressShard(safeStore, route, progressKey, 
coordinateRanges);
-
-        if (!validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore, 
null, Ignore, partialDeps, Set))
-            throw new AssertionError("Invalid response from validate 
function");
-
-        setExecuteAt(executeAt);
-        setPromised(ballot);
-        setAccepted(ballot);
-
-        // TODO (desired, clarity/efficiency): we don't need to set the route 
here, and perhaps we don't even need to
-        //  distributed partialDeps at all, since all we gain is not waiting 
for these transactions to commit during
-        //  recovery. We probably don't want to directly persist a Route in 
any other circumstances, either, to ease persistence.
-        set(safeStore, coordinateRanges, acceptRanges, shard, route, null, 
Ignore, partialDeps, Set);
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Command command = (Command) o;
+            return txnId.equals(command.txnId())
+                    && status == command.saveStatus()
+                    && durability == command.durability()
+                    && Objects.equals(homeKey, command.homeKey())
+                    && Objects.equals(progressKey, command.progressKey())
+                    && Objects.equals(route, command.route())
+                    && Objects.equals(promised, command.promised())
+                    && listeners.equals(command.listeners());
+        }
 
-        // set only registers by transaction keys, which we mightn't already 
have received
-        if (!known().isDefinitionKnown())
-            safeStore.register(keys, acceptRanges, this);
+        @Override
+        public String toString()
+        {
+            return "Command@" + System.identityHashCode(this) + '{' + txnId + 
':' + status + '}';
+        }
 
-        setStatus(Accepted);
-        safeStore.progressLog().accepted(this, shard);
-        notifyListeners(safeStore);
+        @Override
+        public int hashCode()

Review Comment:
   maybe throw `UnsupportedOperationException`, since we don't expect this 
method to be used?



##########
accord-core/src/main/java/accord/impl/CommandsForKey.java:
##########
@@ -20,97 +20,550 @@
 
 import accord.api.Key;
 import accord.local.*;
-import accord.local.SafeCommandStore.CommandFunction;
-import accord.local.SafeCommandStore.TestDep;
-import accord.local.SafeCommandStore.TestKind;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import accord.primitives.*;
+import com.google.common.collect.ImmutableSortedMap;
 
 import javax.annotation.Nullable;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
 
-public abstract class CommandsForKey implements CommandListener
+import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
+import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.local.SafeCommandStore.TestKind.Ws;
+import static accord.local.Status.PreAccepted;
+import static accord.local.Status.PreCommitted;
+import static accord.utils.Utils.*;
+
+public class CommandsForKey extends ImmutableState
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(CommandsForKey.class);
+    // sentinel value to indicate a cfk requested in a preexecute context was 
not found
+    // should not escape the safe command store
+    public static final CommandsForKey EMPTY = new CommandsForKey(null, null)
+    {
+        @Override public Key key() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Timestamp max() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Timestamp lastExecutedTimestamp() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public long lastExecutedMicros() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Timestamp lastWriteTimestamp() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public CommandTimeseries<?> byId() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public CommandTimeseries<?> byExecuteAt() { throw new 
IllegalStateException("Attempting to access EMPTY sentinel values"); }
+
+        @Override
+        public String toString()
+        {
+            return "CommandsForKey[EMPTY]";
+        }
+    };
+
+    static
+    {
+        EMPTY.markInvalidated();
+    }
+
+    public static class SerializerSupport
+    {
+        public static CommandsForKey.Listener listener(Key key)
+        {
+            return new CommandsForKey.Listener(key);
+        }
+
+        public static  <D> CommandsForKey create(Key key, Timestamp max,
+                                                 Timestamp 
lastExecutedTimestamp, long lastExecutedMicros, Timestamp lastWriteTimestamp,
+                                                 CommandLoader<D> loader,
+                                                 ImmutableSortedMap<Timestamp, 
D> byId,
+                                                 ImmutableSortedMap<Timestamp, 
D> byExecuteAt)
+        {
+            return new CommandsForKey(key, max, lastExecutedTimestamp, 
lastExecutedMicros, lastWriteTimestamp, loader, byId, byExecuteAt);
+        }
+    }
+
+    public interface CommandLoader<D>
+    {
+        D saveForCFK(Command command);
+
+        TxnId txnId(D data);
+        Timestamp executeAt(D data);
+        Txn.Kind txnKind(D data);
+        SaveStatus saveStatus(D data);
+        PartialDeps partialDeps(D data);
+
+        default Status status(D data)
+        {
+            return saveStatus(data).status;
+        }
+
+        default Status.Known known(D data)
+        {
+            return saveStatus(data).known;
+        }
+    }
 
-    public interface CommandTimeseries
+    public static class CommandTimeseries<D>
     {
-        void add(Timestamp timestamp, Command command);
-        void remove(Timestamp timestamp);
+        public enum TestTimestamp {BEFORE, AFTER}
+
+        private final Key key;
+        protected final CommandLoader<D> loader;
+        public final ImmutableSortedMap<Timestamp, D> commands;
+
+        public CommandTimeseries(Update<D> builder)
+        {
+            this.key = builder.key;
+            this.loader = builder.loader;
+            this.commands = ensureSortedImmutable(builder.commands);
+        }
+
+        CommandTimeseries(Key key, CommandLoader<D> loader, 
ImmutableSortedMap<Timestamp, D> commands)
+        {
+            this.key = key;
+            this.loader = loader;
+            this.commands = commands;
+        }
 
-        boolean isEmpty();
+        public CommandTimeseries(Key key, CommandLoader<D> loader)
+        {
+            this(key, loader, ImmutableSortedMap.of());
+        }
 
-        enum TestTimestamp { BEFORE, AFTER }
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            CommandTimeseries<?> that = (CommandTimeseries<?>) o;
+            return key.equals(that.key) && loader.equals(that.loader) && 
commands.equals(that.commands);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int hash = 1;
+            hash = 31 * hash + Objects.hashCode(key);
+            hash = 31 * hash + Objects.hashCode(loader);
+            hash = 31 * hash + Objects.hashCode(commands);
+            return hash;
+        }
+
+        public D get(Timestamp key)
+        {
+            return commands.get(key);
+        }
+
+        public boolean isEmpty()
+        {
+            return commands.isEmpty();
+        }
 
         /**
          * All commands before/after (exclusive of) the given timestamp
-         *
+         * <p>
          * Note that {@code testDep} applies only to commands that know at 
least proposed deps; if specified any
          * commands that do not know any deps will be ignored.
-         *
+         * <p>
          * TODO (expected, efficiency): TestDep should be asynchronous; data 
should not be kept memory-resident as only used for recovery
          */
-        <T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp, 
Timestamp timestamp,
-                        TestDep testDep, @Nullable TxnId depId,
-                        @Nullable Status minStatus, @Nullable Status maxStatus,
-                        CommandFunction<T, T> map, T initialValue, T 
terminalValue);
+        public <T> T mapReduce(SafeCommandStore.TestKind testKind, 
TestTimestamp testTimestamp, Timestamp timestamp,
+                               SafeCommandStore.TestDep testDep, @Nullable 
TxnId depId,
+                               @Nullable Status minStatus, @Nullable Status 
maxStatus,
+                               SafeCommandStore.CommandFunction<T, T> map, T 
initialValue, T terminalValue)
+        {
+
+            for (D data : (testTimestamp == TestTimestamp.BEFORE ? 
commands.headMap(timestamp, false) : commands.tailMap(timestamp, 
false)).values())
+            {
+                TxnId txnId = loader.txnId(data);
+                Timestamp executeAt = loader.executeAt(data);
+                SaveStatus status = loader.saveStatus(data);
+                PartialDeps deps = loader.partialDeps(data);
+                if (testKind == Ws && txnId.isRead()) continue;
+                // If we don't have any dependencies, we treat a dependency 
filter as a mismatch
+                if (testDep != ANY_DEPS && 
(!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != 
(testDep == WITH))))
+                    continue;
+                if (minStatus != null && minStatus.compareTo(status.status) > 
0)
+                    continue;
+                if (maxStatus != null && maxStatus.compareTo(status.status) < 
0)
+                    continue;
+                initialValue = map.apply(key, txnId, executeAt, initialValue);
+                if (initialValue.equals(terminalValue))
+                    break;
+            }
+            return initialValue;
+        }
+
+        Stream<TxnId> between(Timestamp min, Timestamp max, Predicate<Status> 
statusPredicate)
+        {
+            return commands.subMap(min, true, max, true).values().stream()
+                    .filter(d -> 
statusPredicate.test(loader.status(d))).map(loader::txnId);
+        }
+
+        public Stream<D> all()
+        {
+            return commands.values().stream();
+        }
+
+        Update<D> beginUpdate()
+        {
+            return new Update<>(this);
+        }
+
+        public CommandLoader<D> loader()
+        {
+            return loader;
+        }
+
+        public static class Update<D>
+        {
+            private final Key key;
+            protected CommandLoader<D> loader;
+            protected NavigableMap<Timestamp, D> commands;
+
+            public Update(Key key, CommandLoader<D> loader)
+            {
+                this.key = key;
+                this.loader = loader;
+                this.commands = new TreeMap<>();
+            }
+
+            public Update(CommandTimeseries<D> timeseries)
+            {
+                this.key = timeseries.key;
+                this.loader = timeseries.loader;
+                this.commands = timeseries.commands;
+            }
+
+            public void add(Timestamp timestamp, Command command)
+            {
+                commands = ensureSortedMutable(commands);
+                commands.put(timestamp, loader.saveForCFK(command));
+            }
+
+            public void remove(Timestamp timestamp)
+            {
+                commands = ensureSortedMutable(commands);
+                commands.remove(timestamp);
+            }
+
+            CommandTimeseries<D> build()
+            {
+                return new CommandTimeseries<>(this);
+            }
+        }
     }
 
-    public abstract Key key();
-    public abstract CommandTimeseries byId();
-    public abstract CommandTimeseries byExecuteAt();
+    public static class Listener implements CommandListener
+    {
+        protected final Key listenerKey;
+
+        private Listener(Key listenerKey)
+        {
+            this.listenerKey = listenerKey;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Listener that = (Listener) o;
+            return listenerKey.equals(that.listenerKey);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(listenerKey);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "ListenerProxy{" + listenerKey + '}';
+        }
+
+        public Key key()
+        {
+            return listenerKey;
+        }
 
-    public abstract Timestamp max();
-    protected abstract void updateMax(Timestamp timestamp);
+        @Override
+        public void onChange(SafeCommandStore safeStore, TxnId txnId)
+        {
+            CommandsForKeys.listenerUpdate(safeStore, 
safeStore.commandsForKey(listenerKey), safeStore.command(txnId));
+        }
+
+        @Override
+        public PreLoadContext listenerPreLoadContext(TxnId caller)
+        {
+            return PreLoadContext.contextFor(caller, Keys.of(listenerKey));
+        }
+    }
+
+    public static CommandListener listener(Key key)
+    {
+        return new Listener(key);
+    }
+
+    // TODO (now): add validation that anything inserted into *committedBy* 
has everything prior in its dependencies
+    private final Key key;
+    private final Timestamp max;
+    private final Timestamp lastExecutedTimestamp;
+    private final long lastExecutedMicros;
+    private final Timestamp lastWriteTimestamp;
+    private final CommandTimeseries<?> byId;
+    private final CommandTimeseries<?> byExecuteAt;
+
+    private  <D> CommandsForKey(Key key, Timestamp max,
+                              Timestamp lastExecutedTimestamp, long 
lastExecutedMicros, Timestamp lastWriteTimestamp,
+                              CommandLoader<D> loader,
+                              ImmutableSortedMap<Timestamp, D> committedById,
+                              ImmutableSortedMap<Timestamp, D> 
committedByExecuteAt)
+    {
+        this.key = key;
+        this.max = max;
+        this.lastExecutedTimestamp = lastExecutedTimestamp;
+        this.lastExecutedMicros = lastExecutedMicros;
+        this.lastWriteTimestamp = lastWriteTimestamp;
+        this.byId = new CommandTimeseries<>(key, loader, committedById);
+        this.byExecuteAt = new CommandTimeseries<>(key, loader, 
committedByExecuteAt);
+    }
+
+    public <D> CommandsForKey(Key key, CommandLoader<D> loader)
+    {
+        this.key = key;
+        this.max = Timestamp.NONE;
+        this.lastExecutedTimestamp = Timestamp.NONE;
+        this.lastExecutedMicros = 0;
+        this.lastWriteTimestamp = Timestamp.NONE;
+        this.byId = new CommandTimeseries<>(key, loader);
+        this.byExecuteAt = new CommandTimeseries<>(key, loader);
+    }
+
+    public CommandsForKey(Update builder)
+    {
+        this.key = builder.key;
+        this.max = builder.max;
+        this.lastExecutedTimestamp = builder.lastExecutedTimestamp;
+        this.lastExecutedMicros = builder.lastExecutedMicros;
+        this.lastWriteTimestamp = builder.lastWriteTimestamp;
+        this.byId = builder.byId.build();
+        this.byExecuteAt = builder.byExecuteAt.build();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CommandsForKey@" + System.identityHashCode(this) + '{' + key + 
'}';
+    }
 
     @Override
-    public PreLoadContext listenerPreLoadContext(TxnId caller)
+    public boolean equals(Object o)
     {
-        return PreLoadContext.contextFor(caller, Keys.of(key()));
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CommandsForKey that = (CommandsForKey) o;
+        return lastExecutedMicros == that.lastExecutedMicros
+                && key.equals(that.key)
+                && Objects.equals(max, that.max)
+                && Objects.equals(lastExecutedTimestamp, 
that.lastExecutedTimestamp)
+                && Objects.equals(lastWriteTimestamp, that.lastWriteTimestamp)
+                && byId.equals(that.byId)
+                && byExecuteAt.equals(that.byExecuteAt);
     }
 
     @Override
-    public void onChange(SafeCommandStore safeStore, Command command)
-    {
-        logger.trace("[{}]: updating as listener in response to change on {} 
with status {} ({})",
-                     key(), command.txnId(), command.status(), command);
-        updateMax(command.executeAt());
-        switch (command.status())
-        {
-            default: throw new AssertionError();
-            case PreAccepted:
-            case NotWitnessed:
-            case Accepted:
-            case AcceptedInvalidate:
-            case PreCommitted:
-                break;
-            case Applied:
-            case PreApplied:
-            case Committed:
-            case ReadyToExecute:
-                byExecuteAt().remove(command.txnId());
-                byExecuteAt().add(command.executeAt(), command);
-                break;
-            case Invalidated:
-                byId().remove(command.txnId());
-                byExecuteAt().remove(command.txnId());
-                command.removeListener(this);
-                break;
-        }
-    }
-
-    public void register(Command command)
-    {
-        updateMax(command.executeAt());
-        byId().add(command.txnId(), command);
-        byExecuteAt().add(command.txnId(), command);
-        command.addListener(this);
-    }
-
-    public boolean isEmpty()
-    {
-        return byId().isEmpty();
+    public int hashCode()
+    {
+        int hash = 1;
+        hash = 31 * hash + Objects.hashCode(key);
+        hash = 31 * hash + Objects.hashCode(max);
+        hash = 31 * hash + Objects.hashCode(lastExecutedTimestamp);
+        hash = 31 * hash + Long.hashCode(lastExecutedMicros);
+        hash = 31 * hash + Objects.hashCode(lastWriteTimestamp);
+        hash = 31 * hash + byId.hashCode();
+        hash = 31 * hash + byExecuteAt.hashCode();
+        return hash;
+    }
+
+    public Key key()
+    {
+        checkCanReadFrom();
+        return key;
+    }
+
+    public Timestamp max()
+    {
+        checkCanReadFrom();
+        return max;
+    }
+
+    public Timestamp lastExecutedTimestamp()
+    {
+        return lastExecutedTimestamp;
+    }
+
+    public long lastExecutedMicros()
+    {
+        return lastExecutedMicros;
+    }
+
+    public Timestamp lastWriteTimestamp()
+    {
+        return lastWriteTimestamp;
+    }
+
+    public CommandTimeseries<?> byId()
+    {
+        return byId;
+    }
+
+    public CommandTimeseries<?> byExecuteAt()
+    {
+        return byExecuteAt;
+    }
+
+    public void forWitnessed(Timestamp minTs, Timestamp maxTs, Consumer<TxnId> 
consumer)
+    {
+        byId.between(minTs, maxTs, status -> 
status.hasBeen(PreAccepted)).forEach(consumer);
+        byExecuteAt.between(minTs, maxTs, status -> 
status.hasBeen(PreCommitted)).forEach(consumer);
+    }
+
+    private static long getTimestampMicros(Timestamp timestamp)
+    {
+        return timestamp.msb + timestamp.lsb;
+    }
+
+
+    private void validateExecuteAtTime(Timestamp executeAt, boolean 
isForWriteTxn)
+    {
+        if (executeAt.compareTo(lastWriteTimestamp) < 0)
+            throw new IllegalArgumentException(String.format("%s is less than 
the most recent write timestamp %s", executeAt, lastWriteTimestamp));
+
+        int cmp = executeAt.compareTo(lastExecutedTimestamp);
+        // execute can be in the past if it's for a read and after the most 
recent write
+        if (cmp == 0 || (!isForWriteTxn && cmp < 0))
+            return;
+        if (cmp < 0)
+            throw new IllegalArgumentException(String.format("%s is less than 
the most recent executed timestamp %s", executeAt, lastExecutedTimestamp));
+        else
+            throw new IllegalArgumentException(String.format("%s is greater 
than the most recent executed timestamp, cfk should be updated", executeAt, 
lastExecutedTimestamp));
+    }
+
+    public int nowInSecondsFor(Timestamp executeAt, boolean isForWriteTxn)
+    {
+        validateExecuteAtTime(executeAt, isForWriteTxn);
+        // we use the executeAt time instead of the monotonic database 
timestamp to prevent uneven
+        // ttl expiration in extreme cases, ie 1M+ writes/second to a key 
causing timestamps to overflow
+        // into the next second on some keys and not others.
+        return 
Math.toIntExact(TimeUnit.MICROSECONDS.toSeconds(getTimestampMicros(lastExecutedTimestamp)));
+    }
+
+    public long timestampMicrosFor(Timestamp executeAt, boolean isForWriteTxn)
+    {
+        validateExecuteAtTime(executeAt, isForWriteTxn);
+        return lastExecutedMicros;
+    }
+
+    public static class Update
+    {
+        private final SafeCommandStore safeStore;
+        private boolean completed = false;
+        private final Key key;
+        private final CommandsForKey original;
+        private Timestamp max;
+        private Timestamp lastExecutedTimestamp;
+        private long lastExecutedMicros;
+        private Timestamp lastWriteTimestamp;
+        private final CommandTimeseries.Update<?> byId;
+        private final CommandTimeseries.Update<?> byExecuteAt;
+
+        public Update(SafeCommandStore safeStore, CommandsForKey original)
+        {
+            original.checkCanUpdate();
+            this.safeStore = safeStore;
+            this.original = original;
+            this.key = original.key;
+            this.max = original.max;
+            this.lastExecutedTimestamp = original.lastExecutedTimestamp;
+            this.lastExecutedMicros = original.lastExecutedMicros;
+            this.lastWriteTimestamp = original.lastWriteTimestamp;
+            this.byId = original.byId.beginUpdate();
+            this.byExecuteAt = original.byId.beginUpdate();
+        }
+
+        private void checkNotCompleted()
+        {
+            if (completed)
+                throw new IllegalStateException(this + " has been completed");
+        }
+
+        public Key key()
+        {
+            return key;
+        }
+
+        public void updateMax(Timestamp timestamp)
+        {
+            checkNotCompleted();
+            max = Timestamp.max(max, timestamp);
+        }
+
+        public CommandTimeseries.Update<?> byId()
+        {
+            checkNotCompleted();
+            return byId;
+        }
+
+        public CommandTimeseries.Update<?> byExecuteAt()
+        {
+            checkNotCompleted();
+            return byExecuteAt;
+        }
+
+        void updateLastExecutionTimestamps(Timestamp executeAt, boolean 
isForWriteTxn)
+        {
+            long micros = getTimestampMicros(executeAt);
+            long lastMicros = lastExecutedMicros;
+
+            lastExecutedTimestamp = executeAt;
+            lastExecutedMicros = Math.max(micros, lastMicros + 1);
+            if (isForWriteTxn)
+                lastWriteTimestamp = executeAt;
+        }
+
+        public CommandsForKey complete()
+        {
+            checkNotCompleted();
+            CommandsForKey updated = new CommandsForKey(this);
+            if (original != null)
+                original.markSuperseded();
+            updated.markActive();
+            safeStore.completeUpdate(this, original, updated);
+            completed = true;
+            return updated;
+        }
+    }
+
+    public static CommandsForKey updateLastExecutionTimestamps(CommandsForKey 
current, SafeCommandStore safeStore, Timestamp executeAt, boolean isForWriteTxn)

Review Comment:
   👍. We should probably annotate such methods in some way, and suppress 
linting. I thought I had previously seen VisibleForImplementation but it looks 
like I imagined it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to