belliottsmith commented on code in PR #33:
URL: https://github.com/apache/cassandra-accord/pull/33#discussion_r1121604470


##########
accord-core/src/main/java/accord/local/SafeState.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+/**
+ * State scoped to a single request that references global state
+ */
+public interface SafeState<T>

Review Comment:
   This could be an implementation-only interface, I think? We only need 
`SafeCommand` outside of the `CommandForKey` implementation context.



##########
accord-core/src/main/java/accord/local/SafeState.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+/**
+ * State scoped to a single request that references global state
+ */
+public interface SafeState<T>
+{
+    T current();
+    void invalidate();
+    boolean invalidated();
+
+    default boolean isEmpty()
+    {
+        return current() == null;
+    }
+
+    default void checkNotInvalidated()

Review Comment:
   Does this need to be an interface method?



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,85 +18,289 @@
 
 package accord.local;
 
-import accord.api.*;
-import accord.local.Status.Durability;
-import accord.local.Status.Known;
+import accord.api.Data;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.api.VisibleForImplementation;
 import accord.primitives.*;
-import accord.primitives.Writes;
 import accord.utils.Invariants;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import static accord.local.Status.*;
-import static accord.local.Status.Known.*;
-import static accord.local.Status.Known.Done;
-import static accord.local.Status.Known.ExecuteAtOnly;
-import static accord.primitives.Route.isFullRoute;
-import static accord.utils.Utils.listOf;
-
-import javax.annotation.Nonnull;
+import accord.utils.Utils;
+import accord.utils.async.AsyncChain;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+
 import javax.annotation.Nullable;
+import java.util.*;
 
-import accord.api.ProgressLog.ProgressShard;
-import accord.primitives.Ranges;
-import accord.primitives.Ballot;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.Route;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.api.Result;
-import accord.api.RoutingKey;
+import static accord.local.Status.Durability.Local;
+import static accord.local.Status.Durability.NotDurable;
+import static accord.local.Status.Known.DefinitionOnly;
+import static accord.utils.Utils.*;
+import static java.lang.String.format;
 
-import static accord.api.ProgressLog.ProgressShard.Home;
-import static accord.api.ProgressLog.ProgressShard.Local;
-import static accord.api.ProgressLog.ProgressShard.No;
-import static accord.api.ProgressLog.ProgressShard.Unsure;
-import static accord.local.Command.EnsureAction.Add;
-import static accord.local.Command.EnsureAction.Check;
-import static accord.local.Command.EnsureAction.Ignore;
-import static accord.local.Command.EnsureAction.Set;
-import static accord.local.Command.EnsureAction.TrySet;
-
-public abstract class Command implements CommandListener, 
BiConsumer<SafeCommandStore, CommandListener>, PreLoadContext
+public abstract class Command implements CommonAttributes
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(Command.class);
+    static PreLoadContext contextForCommand(Command command)
+    {
+        Invariants.checkState(command.hasBeen(Status.PreAccepted) && 
command.partialTxn() != null);
+        return command instanceof PreLoadContext ? (PreLoadContext) command : 
PreLoadContext.contextFor(command.txnId(), command.partialTxn().keys());
+    }
 
-    public abstract TxnId txnId();
+    private static Status.Durability durability(Status.Durability durability, 
SaveStatus status)
+    {
+        if (status.compareTo(SaveStatus.PreApplied) >= 0 && durability == 
NotDurable)
+            return Local; // not necessary anywhere, but helps for logical 
consistency
+        return durability;
+    }
 
-    // TODO (desirable, API consistency): should any of these calls be 
replaced by corresponding known() registers?
-    public boolean hasBeen(Status status)
+    @VisibleForImplementation
+    public static class SerializerSupport
     {
-        return status().hasBeen(status);
+        public static NotWitnessed notWitnessed(CommonAttributes attributes, 
Ballot promised)
+        {
+            return NotWitnessed.notWitnessed(attributes, promised);
+        }
+
+        public static PreAccepted preaccepted(CommonAttributes common, 
Timestamp executeAt, Ballot promised)
+        {
+            return PreAccepted.preAccepted(common, executeAt, promised);
+        }
+
+        public static Accepted accepted(CommonAttributes common, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted)
+        {
+            return Accepted.accepted(common, status, executeAt, promised, 
accepted);
+        }
+
+        public static Committed committed(CommonAttributes common, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted, 
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> 
waitingOnApply)
+        {
+            return Committed.committed(common, status, executeAt, promised, 
accepted, waitingOnCommit, waitingOnApply);
+        }
+
+        public static Executed executed(CommonAttributes common, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted, 
ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> 
waitingOnApply, Writes writes, Result result)
+        {
+            return Executed.executed(common, status, executeAt, promised, 
accepted, waitingOnCommit, waitingOnApply, writes, result);
+        }
     }
 
-    public boolean has(Known known)
+    private static SaveStatus validateCommandClass(SaveStatus status, Class<?> 
expected, Class<?> actual)
     {
-        return known.isSatisfiedBy(saveStatus().known);
+        if (actual != expected)
+        {
+            throw new IllegalStateException(format("Cannot instantiate %s for 
status %s. %s expected",
+                                                   actual.getSimpleName(), 
status, expected.getSimpleName()));
+        }
+        return status;
     }
 
-    public boolean has(Definition definition)
+    private static SaveStatus validateCommandClass(SaveStatus status, Class<?> 
klass)
     {
-        return known().definition.compareTo(definition) >= 0;
+        switch (status.status)
+        {
+            case NotWitnessed:
+                return validateCommandClass(status, NotWitnessed.class, klass);
+            case PreAccepted:
+                return validateCommandClass(status, PreAccepted.class, klass);
+            case AcceptedInvalidate:
+            case Accepted:
+            case PreCommitted:
+                return validateCommandClass(status, Accepted.class, klass);
+            case Committed:
+            case ReadyToExecute:
+                return validateCommandClass(status, Committed.class, klass);
+            case PreApplied:
+            case Applied:
+            case Invalidated:
+                return validateCommandClass(status, Executed.class, klass);
+            default:
+                throw new IllegalStateException("Unhandled status " + status);
+        }
     }
 
-    public boolean has(Outcome outcome)
+    public static class Listener implements CommandListener
     {
-        return known().outcome.compareTo(outcome) >= 0;
+        protected final TxnId listenerId;
+
+        private Listener(TxnId listenerId)

Review Comment:
   Does it need to be private if we just have a simple identical static factory 
method?



##########
accord-core/src/main/java/accord/local/PreLoadContext.java:
##########
@@ -52,6 +57,13 @@
      */
     Seekables<?, ?> keys();
 
+    default boolean isSubsetOf(PreLoadContext superset)
+    {
+        Set<TxnId> superIds = Sets.newHashSet(superset.txnIds());

Review Comment:
   might be nice to try to cast first, or else to require that PreLoadContext 
fields are already sets?



##########
accord-core/src/main/java/accord/local/CommonAttributes.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.api.RoutingKey;
+import accord.api.VisibleForImplementation;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Route;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+import static accord.utils.Utils.ensureImmutable;
+import static accord.utils.Utils.ensureMutable;
+
+// TODO: remove or cleanup
+public interface CommonAttributes
+{
+    TxnId txnId();
+    Status.Durability durability();
+    RoutingKey homeKey();
+    RoutingKey progressKey();
+    Route<?> route();
+    PartialTxn partialTxn();
+    PartialDeps partialDeps();
+    Listeners.Immutable listeners();
+
+    default Mutable mutableAttrs()

Review Comment:
   `mutable`?



##########
accord-core/src/main/java/accord/local/Commands.java:
##########
@@ -0,0 +1,1083 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.api.ProgressLog.ProgressShard;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.local.Command.WaitingOn;
+import accord.primitives.*;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static accord.api.ProgressLog.ProgressShard.*;
+import static accord.api.ProgressLog.ProgressShard.Local;
+import static accord.local.Commands.EnsureAction.*;
+import static accord.local.Status.*;
+import static accord.local.Status.Known.ExecuteAtOnly;
+import static accord.primitives.Route.isFullRoute;
+
+public class Commands
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(Commands.class);
+
+    private Commands()
+    {
+    }
+
+    private static Ranges covers(@Nullable PartialTxn txn)
+    {
+        return txn == null ? null : txn.covering();
+    }
+
+    private static Ranges covers(@Nullable PartialDeps deps)
+    {
+        return deps == null ? null : deps.covering;
+    }
+
+    private static boolean hasQuery(PartialTxn txn)
+    {
+        return txn != null && txn.query() != null;
+    }
+
+    /**
+     * true iff this commandStore owns the given key on the given epoch
+     */
+    public static boolean owns(SafeCommandStore safeStore, long epoch, 
RoutingKey someKey)
+    {
+        return safeStore.ranges().at(epoch).contains(someKey);
+    }
+
+    public static RoutingKey noProgressKey()
+    {
+        return NO_PROGRESS_KEY;
+    }
+
+    public enum AcceptOutcome {Success, Redundant, RejectedBallot}
+
+    public static AcceptOutcome preaccept(SafeCommandStore safeStore, TxnId 
txnId, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
+    {
+        return preacceptOrRecover(safeStore, txnId, partialTxn, route, 
progressKey, Ballot.ZERO);
+    }
+
+    public static AcceptOutcome recover(SafeCommandStore safeStore, TxnId 
txnId, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, 
Ballot ballot)
+    {
+        return preacceptOrRecover(safeStore, txnId, partialTxn, route, 
progressKey, ballot);
+    }
+
+    private static AcceptOutcome preacceptOrRecover(SafeCommandStore 
safeStore, TxnId txnId, PartialTxn partialTxn, Route<?> route, @Nullable 
RoutingKey progressKey, Ballot ballot)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+
+        int compareBallots = command.promised().compareTo(ballot);
+        if (compareBallots > 0)
+        {
+            logger.trace("{}: skipping preaccept - higher ballot witnessed 
({})", txnId, command.promised());
+            return AcceptOutcome.RejectedBallot;
+        }
+
+        if (command.known().definition.isKnown())
+        {
+            Invariants.checkState(command.status() == Invalidated || 
command.executeAt() != null);
+            logger.trace("{}: skipping preaccept - already known ({})", txnId, 
command.status());
+            // in case of Ballot.ZERO, we must either have a competing 
recovery coordinator or have late delivery of the
+            // preaccept; in the former case we should abandon coordination, 
and in the latter we have already completed
+            liveCommand.updatePromised(ballot);
+            return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : 
AcceptOutcome.Success;
+        }
+
+        Ranges coordinateRanges = coordinateRanges(safeStore, command);
+        Invariants.checkState(!coordinateRanges.isEmpty());
+        CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, 
command.txnId(), command, route, progressKey, coordinateRanges);
+        ProgressShard shard = progressShard(attrs, progressKey, 
coordinateRanges);
+        if (!validate(command.status(), attrs, Ranges.EMPTY, coordinateRanges, 
shard, route, Set, partialTxn, Set, null, Ignore))
+            throw new IllegalStateException();
+
+        // FIXME: this should go into a consumer method
+        attrs = set(safeStore, command, attrs, Ranges.EMPTY, coordinateRanges, 
shard, route, partialTxn, Set, null, Ignore);
+        if (command.executeAt() == null)
+        {
+            // unlike in the Accord paper, we partition shards within a node, 
so that to ensure a total order we must either:
+            //  - use a global logical clock to issue new timestamps; or
+            //  - assign each shard _and_ process a unique id, and use both as 
components of the timestamp
+            // if we are performing recovery (i.e. non-zero ballot), do not 
permit a fast path decision as we want to
+            // invalidate any transactions that were not completed by their 
initial coordinator
+            Timestamp executeAt = ballot.equals(Ballot.ZERO)
+                    ? safeStore.preaccept(txnId, partialTxn.keys())
+                    : safeStore.time().uniqueNow(txnId);
+            command = liveCommand.preaccept(attrs, executeAt, ballot);
+            safeStore.progressLog().preaccepted(command, shard);
+        }
+        else
+        {
+            // TODO (expected, ?): in the case that we are pre-committed but 
had not been preaccepted/accepted, should we inform progressLog?
+            command = liveCommand.markDefined(attrs, ballot);
+        }
+
+        safeStore.notifyListeners(command);
+        return AcceptOutcome.Success;
+    }
+
+    public static boolean preacceptInvalidate(SafeCommandStore safeStore, 
TxnId txnId, Ballot ballot)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.promised().compareTo(ballot) > 0)
+        {
+            logger.trace("{}: skipping preacceptInvalidate - witnessed higher 
ballot ({})", command.txnId(), command.promised());
+            return false;
+        }
+        liveCommand.updatePromised(ballot);
+        return true;
+    }
+
+    public static AcceptOutcome accept(SafeCommandStore safeStore, TxnId 
txnId, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable 
RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.promised().compareTo(ballot) > 0)
+        {
+            logger.trace("{}: skipping accept - witnessed higher ballot ({} > 
{})", txnId, command.promised(), ballot);
+            return AcceptOutcome.RejectedBallot;
+        }
+
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping accept - already committed ({})", 
txnId, command.status());
+            return AcceptOutcome.Redundant;
+        }
+
+        Ranges coordinateRanges = coordinateRanges(safeStore, command);
+        Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? 
coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
+        Invariants.checkState(!acceptRanges.isEmpty());
+
+        CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, 
command.txnId(), command, route, progressKey, coordinateRanges);
+        ProgressShard shard = progressShard(attrs, progressKey, 
coordinateRanges);
+        if (!validate(command.status(), attrs, coordinateRanges, Ranges.EMPTY, 
shard, route, Ignore, null, Ignore, partialDeps, Set))
+        {
+            throw new AssertionError("Invalid response from validate 
function");
+        }
+
+        // TODO (desired, clarity/efficiency): we don't need to set the route 
here, and perhaps we don't even need to
+        //  distributed partialDeps at all, since all we gain is not waiting 
for these transactions to commit during
+        //  recovery. We probably don't want to directly persist a Route in 
any other circumstances, either, to ease persistence.
+        attrs = set(safeStore, command, attrs, coordinateRanges, acceptRanges, 
shard, route, null, Ignore, partialDeps, Set);
+
+        // set only registers by transaction keys, which we mightn't already 
have received
+        if (!command.known().isDefinitionKnown())
+            safeStore.register(keys, acceptRanges, command);
+
+        command = liveCommand.accept(attrs, executeAt, ballot);
+        safeStore.progressLog().accepted(command, shard);
+        safeStore.notifyListeners(command);
+
+        return AcceptOutcome.Success;
+    }
+
+    public static AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, 
SafeCommand liveCommand, Ballot ballot)
+    {
+        Command command = liveCommand.current();
+        if (command.promised().compareTo(ballot) > 0)
+        {
+            logger.trace("{}: skipping accept invalidated - witnessed higher 
ballot ({} > {})", command.txnId(), command.promised(), ballot);
+            return AcceptOutcome.RejectedBallot;
+        }
+
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping accept invalidated - already committed 
({})", command.txnId(), command.status());
+            return AcceptOutcome.Redundant;
+        }
+
+        logger.trace("{}: accepted invalidated", command.txnId());
+
+        command = liveCommand.acceptInvalidated(ballot);
+        safeStore.notifyListeners(command);
+        return AcceptOutcome.Success;
+    }
+
+    public enum CommitOutcome {Success, Redundant, Insufficient;}
+
+
+    // relies on mutual exclusion for each key
+    public static CommitOutcome commit(SafeCommandStore safeStore, TxnId 
txnId, Route<?> route, @Nullable RoutingKey progressKey, @Nullable PartialTxn 
partialTxn, Timestamp executeAt, PartialDeps partialDeps)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping commit - already committed ({})", 
txnId, command.status());
+            if (!executeAt.equals(command.executeAt()) || command.status() == 
Invalidated)
+                safeStore.agent().onInconsistentTimestamp(command, 
(command.status() == Invalidated ? Timestamp.NONE : command.executeAt()), 
executeAt);
+
+            if (command.hasBeen(Committed))
+                return CommitOutcome.Redundant;
+        }
+
+        Ranges coordinateRanges = coordinateRanges(safeStore, command);
+        // TODO (expected, consider): consider ranges between coordinateRanges 
and executeRanges? Perhaps don't need them
+        Ranges executeRanges = executeRanges(safeStore, executeAt);
+
+        CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, 
command.txnId(), command, route, progressKey, coordinateRanges);
+        ProgressShard shard = progressShard(attrs, progressKey, 
coordinateRanges);
+
+        if (!validate(command.status(), attrs, coordinateRanges, 
executeRanges, shard, route, Check, partialTxn, Add, partialDeps, Set))
+        {
+            liveCommand.updateAttributes(attrs);
+            return CommitOutcome.Insufficient;
+        }
+
+        // FIXME: split up set
+        attrs = set(safeStore, command, attrs, coordinateRanges, 
executeRanges, shard, route, partialTxn, Add, partialDeps, Set);
+
+        logger.trace("{}: committed with executeAt: {}, deps: {}", txnId, 
executeAt, partialDeps);
+        WaitingOn waitingOn = populateWaitingOn(safeStore, txnId, executeAt, 
partialDeps);
+        command = liveCommand.commit(attrs, executeAt, waitingOn);
+
+        safeStore.progressLog().committed(command, shard);
+
+        // TODO (expected, safety): introduce intermediate status to avoid 
reentry when notifying listeners (which might notify us)
+        maybeExecute(safeStore, liveCommand, shard, true, true);
+        return CommitOutcome.Success;
+    }
+
+    // relies on mutual exclusion for each key
+    public static void precommit(SafeCommandStore safeStore, TxnId txnId, 
Timestamp executeAt)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping precommit - already committed ({})", 
txnId, command.status());
+            if (executeAt.equals(command.executeAt()) && command.status() != 
Invalidated)
+                return;
+
+            safeStore.agent().onInconsistentTimestamp(command, 
(command.status() == Invalidated ? Timestamp.NONE : command.executeAt()), 
executeAt);
+        }
+
+        command = liveCommand.precommit(executeAt);
+        safeStore.notifyListeners(command);
+        logger.trace("{}: precommitted with executeAt: {}", txnId, executeAt);
+    }
+
+    protected static WaitingOn populateWaitingOn(SafeCommandStore safeStore, 
TxnId txnId, Timestamp executeAt, PartialDeps partialDeps)
+    {
+        Ranges ranges = safeStore.ranges().since(executeAt.epoch());
+        if (ranges == null)
+            return WaitingOn.EMPTY;
+
+        WaitingOn.Update update = new WaitingOn.Update();
+        partialDeps.forEach(ranges, depId -> {
+            SafeCommand liveCommand = safeStore.ifLoaded(depId);
+            if (liveCommand == null)
+            {
+                update.addWaitingOnCommit(depId);
+                safeStore.addAndInvokeListener(depId, txnId);
+            }
+            else
+            {
+                Command command = liveCommand.current();
+                switch (command.status())
+                {
+                    default:
+                        throw new IllegalStateException();
+                    case NotWitnessed:
+                    case PreAccepted:
+                    case Accepted:
+                    case AcceptedInvalidate:
+                    case PreCommitted:
+                        // we don't know when these dependencies will execute, 
and cannot execute until we do
+
+                        command = 
liveCommand.addListener(Command.listener(txnId));
+                        update.addWaitingOnCommit(command.txnId());
+                        break;
+                    case Committed:
+                        // TODO (desired, efficiency): split into ReadyToRead 
and ReadyToWrite;
+                        //                             the distributed read 
can be performed as soon as those keys are ready,
+                        //                             and in parallel with 
any other reads. the client can even ACK immediately after;
+                        //                             only the write needs to 
be postponed until other in-progress reads complete
+                    case ReadyToExecute:
+                    case PreApplied:
+                    case Applied:
+                        command = 
liveCommand.addListener(Command.listener(txnId));
+                        insertPredecessor(txnId, executeAt, update, command);
+                    case Invalidated:
+                        break;
+                }
+            }
+        });
+        return update.build();
+    }
+
+    // TODO (expected, ?): commitInvalidate may need to update cfks _if_ 
possible
+    public static void commitInvalidate(SafeCommandStore safeStore, TxnId 
txnId)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping commit invalidated - already committed 
({})", txnId, command.status());
+            if (!command.hasBeen(Invalidated))
+                safeStore.agent().onInconsistentTimestamp(command, 
Timestamp.NONE, command.executeAt());
+
+            return;
+        }
+
+        ProgressShard shard = progressShard(safeStore, command);
+        safeStore.progressLog().invalidated(command, shard);
+
+        CommonAttributes attrs = command;
+        if (command.partialDeps() == null)
+            attrs = attrs.mutableAttrs().partialDeps(PartialDeps.NONE);
+        command = liveCommand.commitInvalidated(attrs, txnId);
+        logger.trace("{}: committed invalidated", txnId);
+
+        safeStore.notifyListeners(command);
+    }
+
+    public enum ApplyOutcome {Success, Redundant, Insufficient}
+
+
+    public static ApplyOutcome apply(SafeCommandStore safeStore, TxnId txnId, 
long untilEpoch, Route<?> route, Timestamp executeAt, @Nullable PartialDeps 
partialDeps, Writes writes, Result result)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.hasBeen(PreApplied) && 
executeAt.equals(command.executeAt()))
+        {
+            logger.trace("{}: skipping apply - already executed ({})", txnId, 
command.status());
+            return ApplyOutcome.Redundant;
+        }
+        else if (command.hasBeen(PreCommitted) && 
!executeAt.equals(command.executeAt()))
+        {
+            safeStore.agent().onInconsistentTimestamp(command, 
command.executeAt(), executeAt);
+        }
+
+        Ranges coordinateRanges = coordinateRanges(safeStore, command);
+        Ranges executeRanges = executeRanges(safeStore, executeAt);
+        if (untilEpoch < safeStore.latestEpoch())
+        {
+            Ranges expectedRanges = 
safeStore.ranges().between(executeAt.epoch(), untilEpoch);
+            Invariants.checkState(expectedRanges.containsAll(executeRanges));
+        }
+
+        CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, 
command.txnId(), command, route, coordinateRanges);
+        ProgressShard shard = progressShard(attrs, coordinateRanges);
+
+        if (!validate(command.status(), attrs, coordinateRanges, 
executeRanges, shard, route, Check, null, Check, partialDeps, 
command.hasBeen(Committed) ? Add : TrySet))
+        {
+            liveCommand.updateAttributes(attrs);
+            return ApplyOutcome.Insufficient; // TODO (expected, consider): 
this should probably be an assertion failure if !TrySet
+        }
+
+        WaitingOn waitingOn = !command.hasBeen(Committed) ? 
populateWaitingOn(safeStore, txnId, executeAt, partialDeps) : 
command.asCommitted().waitingOn();
+        attrs = set(safeStore, command, attrs, coordinateRanges, 
executeRanges, shard, route, null, Check, partialDeps, 
command.hasBeen(Committed) ? Add : TrySet);
+
+        liveCommand.preapplied(attrs, executeAt, waitingOn, writes, result);
+        logger.trace("{}: apply, status set to Executed with executeAt: {}, 
deps: {}", txnId, executeAt, partialDeps);
+
+        maybeExecute(safeStore, liveCommand, shard, true, true);
+        safeStore.progressLog().executed(liveCommand.current(), shard);
+
+        return ApplyOutcome.Success;
+    }
+
+    public static void listenerUpdate(SafeCommandStore safeStore, SafeCommand 
liveListener, SafeCommand liveUpdated)
+    {
+        Command listener = liveListener.current();
+        Command updated = liveUpdated.current();
+        logger.trace("{}: updating as listener in response to change on {} 
with status {} ({})",
+                     listener.txnId(), updated.txnId(), updated.status(), 
updated);
+        switch (updated.status())
+        {
+            default:
+                throw new IllegalStateException("Unexpected status: " + 
updated.status());
+            case NotWitnessed:
+            case PreAccepted:
+            case Accepted:
+            case AcceptedInvalidate:
+                break;
+
+            case PreCommitted:
+            case Committed:
+            case ReadyToExecute:
+            case PreApplied:
+            case Applied:
+            case Invalidated:
+                updatePredecessorAndMaybeExecute(safeStore, liveListener, 
liveUpdated, false);
+                break;
+        }
+    }
+
+    protected static void postApply(SafeCommandStore safeStore, TxnId txnId)
+    {
+        logger.trace("{} applied, setting status to Applied and notifying 
listeners", txnId);
+        Command command = safeStore.command(txnId).applied();
+        safeStore.notifyListeners(command);
+    }
+
+    private static Function<SafeCommandStore, Void> callPostApply(TxnId txnId)

Review Comment:
   Maybe merge this with `postApply`? My preference would be to call this 
method `postApply` and inline the old `postApply` here.



##########
accord-core/src/main/java/accord/local/CommandListener.java:
##########
@@ -22,7 +22,7 @@
 
 public interface CommandListener
 {
-    void onChange(SafeCommandStore safeStore, Command command);
+    void onChange(SafeCommandStore safeStore, TxnId txnId);

Review Comment:
   pass either the `SafeCommand` or the current `Command`?



##########
accord-core/src/main/java/accord/utils/SortedArrays.java:
##########
@@ -21,6 +21,7 @@
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.function.IntFunction;
+import java.util.function.Predicate;

Review Comment:
   I think still unused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to