This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 6c6872270e16d2e777f1fa2c510b8f15396be3f3
Author: Ariel Weisberg <aweisb...@apple.com>
AuthorDate: Mon May 1 12:19:34 2023 -0400

    Support for interoperability between Accord and non-Accord managed data
    
    Patch by Ariel Weisberg and Blake Eggleston; Reviewed by Blake Eggleston 
for CASSANDRA-18129
    
    Co-authored-by: Blake Eggleston <beggles...@apple.com>
---
 accord-core/src/main/java/accord/api/Data.java     |  12 ++
 accord-core/src/main/java/accord/api/Read.java     |   8 +-
 .../src/main/java/accord/coordinate/Barrier.java   |   1 +
 .../src/main/java/accord/coordinate/Execute.java   | 133 ++----------
 .../src/main/java/accord/coordinate/Persist.java   |  74 +++++--
 .../coordinate/{Execute.java => TxnExecute.java}   |  49 ++---
 .../main/java/accord/coordinate/TxnPersist.java    |  48 +++++
 .../accord/coordinate/tracking/AppliedTracker.java |   2 +-
 .../accord/coordinate/tracking/QuorumTracker.java  |   2 +-
 .../tracking/ResponseTracker.java}                 |  16 +-
 accord-core/src/main/java/accord/local/Node.java   |  42 ++--
 .../main/java/accord/local/SerializerSupport.java  |  68 ++++--
 .../{ReadTxnData.java => AbstractExecute.java}     | 136 +++++++-----
 .../src/main/java/accord/messages/Apply.java       |  27 ++-
 .../accord/messages/ApplyThenWaitUntilApplied.java |   2 +
 .../src/main/java/accord/messages/Commit.java      |  11 +-
 .../java/accord/messages/InformHomeDurable.java    |   4 +-
 .../src/main/java/accord/messages/MessageType.java | 123 +++++++----
 .../src/main/java/accord/messages/ReadData.java    |  21 +-
 .../src/main/java/accord/messages/ReadTxnData.java | 237 +--------------------
 .../java/accord/messages/WaitUntilApplied.java     |   9 +-
 .../src/main/java/accord/primitives/Keys.java      |  15 +-
 .../main/java/accord/primitives/RoutingKeys.java   |  11 +
 .../src/main/java/accord/primitives/Txn.java       |  16 +-
 .../src/main/java/accord/primitives/Writes.java    |  14 +-
 .../src/main/java/accord/topology/Topology.java    |  26 ++-
 .../src/main/java/accord/utils/Invariants.java     |  13 +-
 accord-core/src/test/java/accord/Utils.java        |  10 +-
 .../src/test/java/accord/burn/BurnTest.java        |   9 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |  11 +-
 .../src/test/java/accord/impl/list/ListRead.java   |  13 +-
 .../test/java/accord/impl/mock/MockCluster.java    |   6 +
 .../src/test/java/accord/impl/mock/MockStore.java  |  11 +-
 .../java/accord/local/ImmutableCommandTest.java    |  11 +-
 .../test/java/accord/messages/PreAcceptTest.java   |  49 +++--
 .../test/java/accord/messages/ReadDataTest.java    |   4 +-
 .../src/main/java/accord/maelstrom/Cluster.java    |   6 +-
 .../main/java/accord/maelstrom/MaelstromRead.java  |  12 +-
 .../src/main/java/accord/maelstrom/Main.java       |   6 +-
 39 files changed, 629 insertions(+), 639 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Data.java 
b/accord-core/src/main/java/accord/api/Data.java
index 24945648..6f9510c0 100644
--- a/accord-core/src/main/java/accord/api/Data.java
+++ b/accord-core/src/main/java/accord/api/Data.java
@@ -18,11 +18,23 @@
 
 package accord.api;
 
+import static accord.utils.Invariants.checkState;
+
 /**
  * The result of some (potentially partial) {@link Read} from some {@link 
DataStore}
  */
 public interface Data
 {
+    Data NOOP_DATA = new Data()
+    {
+        @Override
+        public Data merge(Data data)
+        {
+            checkState(data == null || data == NOOP_DATA, "Can't mix no op 
Data with other implementations of Data");
+            return NOOP_DATA;
+        }
+    };
+
     /**
      * Combine the contents of the parameter with this object and return the 
resultant object.
      * This method may modify the current object and return itself.
diff --git a/accord-core/src/main/java/accord/api/Read.java 
b/accord-core/src/main/java/accord/api/Read.java
index 7def168c..3b77ed3c 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -19,17 +19,19 @@
 package accord.api;
 
 import accord.local.SafeCommandStore;
-import accord.primitives.*;
+import accord.primitives.Ranges;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
 import accord.utils.async.AsyncChain;
 
-
 /**
  * A read to be performed on potentially multiple shards, the inputs of which 
may be fed to a {@link Query}
  */
 public interface Read
 {
     Seekables<?, ?> keys();
-    AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore 
commandStore, Timestamp executeAt, DataStore store);
+    AsyncChain<Data> read(Seekable key, SafeCommandStore commandStore, 
Timestamp executeAt, DataStore store);
     Read slice(Ranges ranges);
     Read merge(Read other);
     default boolean isEqualOrFuller(Read other) { return true; }
diff --git a/accord-core/src/main/java/accord/coordinate/Barrier.java 
b/accord-core/src/main/java/accord/coordinate/Barrier.java
index 39e56dee..2939c479 100644
--- a/accord-core/src/main/java/accord/coordinate/Barrier.java
+++ b/accord-core/src/main/java/accord/coordinate/Barrier.java
@@ -146,6 +146,7 @@ public class Barrier<S extends Seekables<?, ?>> extends 
AsyncResults.AbstractRes
         catch (ExecutionException e)
         {
             tryFailure(e);
+            return;
         }
         coordinateSyncPoint.addCallback((syncPoint, syncPointFailure) -> {
             if (syncPointFailure != null)
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java 
b/accord-core/src/main/java/accord/coordinate/Execute.java
index edfd033a..0bc5c6ed 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -18,59 +18,35 @@
 
 package accord.coordinate;
 
-import java.util.Set;
 import java.util.function.BiConsumer;
 
-import accord.api.Data;
 import accord.api.Result;
 import accord.local.Node;
-import accord.local.Node.Id;
-import accord.messages.Commit;
-import accord.messages.ReadData.ReadNack;
-import accord.messages.ReadData.ReadOk;
-import accord.messages.ReadData.ReadReply;
-import accord.messages.ReadTxnData;
 import accord.primitives.Deps;
 import accord.primitives.FullRoute;
 import accord.primitives.Participants;
-import accord.primitives.Ranges;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
-import accord.primitives.Txn.Kind;
 import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import accord.topology.Topology;
+import accord.primitives.Writes;
+import accord.primitives.Txn.Kind;
 
-import static accord.coordinate.ReadCoordinator.Action.Approve;
-import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
-import static accord.messages.Commit.Kind.Maximal;
 import static accord.utils.Invariants.checkArgument;
 
-class Execute extends ReadCoordinator<ReadReply>
+public interface Execute
 {
-    final Txn txn;
-    final Participants<?> readScope;
-    final FullRoute<?> route;
-    final Timestamp executeAt;
-    final Deps deps;
-    final Topologies executes;
-    final BiConsumer<? super Result, Throwable> callback;
-    private Data data;
-
-    private Execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, 
Participants<?> readScope, Timestamp executeAt, Deps deps, BiConsumer<? super 
Result, Throwable> callback)
+    interface Factory
     {
-        super(node, node.topology().forEpoch(readScope, executeAt.epoch()), 
txnId);
-        this.txn = txn;
-        this.route = route;
-        this.readScope = readScope;
-        this.executeAt = executeAt;
-        this.deps = deps;
-        this.executes = node.topology().forEpoch(route, executeAt.epoch());
-        this.callback = callback;
+        Execute create(Node node, TxnId txnId, Txn txn, FullRoute<?> route, 
Participants<?> readScope, Timestamp executeAt, Deps deps, BiConsumer<? super 
Result, Throwable> callback);
     }
 
-    public static void execute(Node node, TxnId txnId, Txn txn, FullRoute<?> 
route, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> 
callback)
+    void start();
+
+    static void execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, 
Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback)
     {
+        Seekables<?, ?> readKeys = txn.read().keys();
+        Participants<?> readScope = readKeys.toParticipants();
         // Recovery calls execute and we would like execute to run BlockOnDeps 
because that will notify the agent
         // of the local barrier
         if (txn.kind() == Kind.SyncPoint)
@@ -78,84 +54,19 @@ class Execute extends ReadCoordinator<ReadReply>
             checkArgument(txnId.equals(executeAt));
             BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, callback);
         }
-        else if (txn.read().keys().isEmpty())
-        {
-            Result result = txn.result(txnId, executeAt, null);
-            Persist.persist(node, txnId, route, txn, executeAt, deps, 
txn.execute(txnId, executeAt, null), result);
-            callback.accept(result, null);
-        }
-        else
-        {
-            Execute execute = new Execute(node, txnId, txn, route, 
txn.keys().toParticipants(), executeAt, deps, callback);
-            execute.start();
-        }
-    }
-
-    @Override
-    protected void start(Set<Id> readSet)
-    {
-        Commit.commitMinimalAndRead(node, executes, txnId, txn, route, 
readScope, executeAt, deps, readSet, this);
-    }
-
-    @Override
-    public void contact(Id to)
-    {
-        node.send(to, new ReadTxnData(to, topologies(), txnId, readScope, 
executeAt), this);
-    }
-
-    @Override
-    protected Ranges unavailable(ReadReply reply)
-    {
-        return ((ReadOk)reply).unavailable;
-    }
-
-    @Override
-    protected Action process(Id from, ReadReply reply)
-    {
-        if (reply.isOk())
-        {
-            ReadOk ok = ((ReadOk) reply);
-            Data next = ok.data;
-            if (next != null)
-                data = data == null ? next : data.merge(next);
-
-            return ok.unavailable == null ? Approve : ApprovePartial;
-        }
-
-        ReadNack nack = (ReadNack) reply;
-        switch (nack)
-        {
-            default: throw new IllegalStateException();
-            case Redundant:
-                callback.accept(null, new Preempted(txnId, route.homeKey()));
-                return Action.Aborted;
-            case NotCommitted:
-                // the replica may be missing the original commit, or the 
additional commit, so send everything
-                Topologies topology = node.topology().preciseEpochs(route, 
txnId.epoch(), executeAt.epoch());
-                Topology coordinateTopology = topology.forEpoch(txnId.epoch());
-                node.send(from, new Commit(Maximal, from, coordinateTopology, 
topology, txnId, txn, route, readScope, executeAt, deps, false));
-                // also try sending a read command to another replica, in case 
they're ready to serve a response
-                return Action.TryAlternative;
-            case Invalid:
-                callback.accept(null, new IllegalStateException("Submitted a 
read command to a replica that did not own the range"));
-                return Action.Aborted;
-        }
-    }
-
-
-    @Override
-    protected void onDone(Success success, Throwable failure)
-    {
-        if (failure == null)
-        {
-            Result result = txn.result(txnId, executeAt, data);
-            callback.accept(result, null);
-            // avoid re-calculating topologies if it is unchanged
-            Persist.persist(node, executes, txnId, route, txn, executeAt, 
deps, txn.execute(txnId, executeAt, data), result);
-        }
         else
         {
-            callback.accept(null, failure);
+            if (readKeys.isEmpty())
+            {
+                Result result = txn.result(txnId, executeAt, null);
+                Writes writes = txn.execute(txnId, executeAt, null);
+                Persist.persist(node, txnId, route, txn, executeAt, deps, 
writes, result, callback);
+            }
+            else
+            {
+                Execute execute = node.executionFactory().create(node, txnId, 
txn, route, readScope, executeAt, deps, callback);
+                execute.start();
+            }
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java 
b/accord-core/src/main/java/accord/coordinate/Persist.java
index 9a132683..a1f3a84a 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -20,6 +20,7 @@ package accord.coordinate;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.BiConsumer;
 
 import accord.api.Result;
 import accord.coordinate.tracking.QuorumTracker;
@@ -29,7 +30,14 @@ import accord.messages.Apply;
 import accord.messages.Apply.ApplyReply;
 import accord.messages.Callback;
 import accord.messages.InformDurable;
-import accord.primitives.*;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.PartialTxn;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
+import accord.primitives.Writes;
 import accord.topology.Topologies;
 
 import static accord.coordinate.tracking.RequestStatus.Success;
@@ -37,50 +45,56 @@ import static accord.local.Status.Durability.Majority;
 import static accord.messages.Apply.executes;
 import static accord.messages.Apply.participates;
 
-public class Persist implements Callback<ApplyReply>
+public abstract class Persist implements Callback<ApplyReply>
 {
-    final Node node;
-    final TxnId txnId;
-    final FullRoute<?> route;
-    final Txn txn;
-    final Timestamp executeAt;
-    final Deps deps;
-    final Writes writes;
-    final Result result;
-    final QuorumTracker tracker;
-    final Set<Id> persistedOn;
+    public interface Factory
+    {
+        Persist create(Node node, Topologies topologies, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result);
+    }
+
+    protected final Node node;
+    protected final TxnId txnId;
+    protected final FullRoute<?> route;
+    protected final Txn txn;
+    protected final Timestamp executeAt;
+    protected final Deps deps;
+    protected final Writes writes;
+    protected final Result result;
+    protected final Topologies topologies;
+    protected final QuorumTracker tracker;
+    protected final Set<Id> persistedOn;
     boolean isDone;
 
-    public static void persist(Node node, TxnId txnId, FullRoute<?> route, Txn 
txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
+    public static void persist(Node node, TxnId txnId, FullRoute<?> route, Txn 
txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? 
super Result, Throwable> clientCallback)
     {
         Topologies executes = executes(node, route, executeAt);
-        persist(node, executes, txnId, route, txn, executeAt, deps, writes, 
result);
+        persist(node, executes, txnId, route, txn, executeAt, deps, writes, 
result, clientCallback);
     }
 
-    public static void persist(Node node, Topologies executes, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result)
+    public static void persist(Node node, Topologies executes, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result, BiConsumer<? super Result, Throwable> clientCallback)
     {
         Topologies participates = participates(node, route, txnId, executeAt, 
executes);
-        Persist persist = new Persist(node, executes, txnId, route, txn, 
executeAt, deps, writes, result);
-        node.send(participates.nodes(), to -> Apply.applyMinimal(to, 
participates, executes, txnId, route, txn, executeAt, deps, writes, result), 
persist);
+        node.persistFactory().create(node, executes, txnId, route, txn, 
executeAt, deps, writes, result)
+                             .applyMinimal(participates, executes, writes, 
result, clientCallback);
     }
 
     public static void persistMaximal(Node node, TxnId txnId, FullRoute<?> 
route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
     {
         Topologies executes = executes(node, route, executeAt);
         Topologies participates = participates(node, route, txnId, executeAt, 
executes);
-        Persist persist = new Persist(node, participates, txnId, route, txn, 
executeAt, deps, writes, result);
-        node.send(participates.nodes(), to -> Apply.applyMaximal(to, 
participates, executes, txnId, route, txn, executeAt, deps, writes, result), 
persist);
+        node.persistFactory().create(node, participates, txnId, route, txn, 
executeAt, deps, writes, result)
+                             .applyMaximal(participates, executes, writes, 
result, null);
     }
 
     public static void persistPartialMaximal(Node node, TxnId txnId, 
Unseekables<?> sendTo, FullRoute<?> route, PartialTxn txn, Timestamp executeAt, 
Deps deps, Writes writes, Result result)
     {
         Topologies executes = executes(node, sendTo, executeAt);
         Topologies participates = participates(node, sendTo, txnId, executeAt, 
executes);
-        Persist persist = new Persist(node, participates, txnId, route, txn, 
executeAt, deps, writes, result);
-        node.send(participates.nodes(), to -> Apply.applyMaximal(to, 
participates, executes, txnId, route, txn, executeAt, deps, writes, result), 
persist);
+        Persist persist = node.persistFactory().create(node, participates, 
txnId, route, txn, executeAt, deps, writes, result);
+        node.send(participates.nodes(), to -> 
Apply.applyMaximal(Apply.FACTORY, to, participates, executes, txnId, route, 
txn, executeAt, deps, writes, result), persist);
     }
 
-    private Persist(Node node, Topologies topologies, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result)
+    protected Persist(Node node, Topologies topologies, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result)
     {
         this.node = node;
         this.txnId = txnId;
@@ -90,6 +104,7 @@ public class Persist implements Callback<ApplyReply>
         this.deps = deps;
         this.writes = writes;
         this.result = result;
+        this.topologies = topologies;
         this.tracker = new QuorumTracker(topologies);
         this.persistedOn = new HashSet<>();
     }
@@ -128,4 +143,19 @@ public class Persist implements Callback<ApplyReply>
     public void onCallbackFailure(Id from, Throwable failure)
     {
     }
+
+    public void applyMinimal(Topologies participates, Topologies executes, 
Writes writes, Result result, BiConsumer<? super Result, Throwable> 
clientCallback)
+    {
+        registerClientCallback(writes, result, clientCallback);
+        // applyMinimal is used for transaction execution by the original 
coordinator so it's important to use
+        // Node's Apply factory in case the factory has to do synchronous 
Apply.
+        node.send(participates.nodes(), to -> 
Apply.applyMinimal(node.applyFactory(), to, participates, executes, txnId, 
route, txn, executeAt, deps, writes, result), this);
+    }
+    public void applyMaximal(Topologies participates, Topologies executes, 
Writes writes, Result result, BiConsumer<? super Result, Throwable> 
clientCallback)
+    {
+        registerClientCallback(writes, result, clientCallback);
+        node.send(participates.nodes(), to -> 
Apply.applyMaximal(Apply.FACTORY, to, participates, executes, txnId, route, 
txn, executeAt, deps, writes, result), this);
+    }
+
+    public abstract void registerClientCallback(Writes writes, Result result, 
BiConsumer<? super Result, Throwable> clientCallback);
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java 
b/accord-core/src/main/java/accord/coordinate/TxnExecute.java
similarity index 68%
copy from accord-core/src/main/java/accord/coordinate/Execute.java
copy to accord-core/src/main/java/accord/coordinate/TxnExecute.java
index edfd033a..d9c0d4cd 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/TxnExecute.java
@@ -21,6 +21,9 @@ package accord.coordinate;
 import java.util.Set;
 import java.util.function.BiConsumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.api.Data;
 import accord.api.Result;
 import accord.local.Node;
@@ -36,18 +39,19 @@ import accord.primitives.Participants;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
-import accord.primitives.Txn.Kind;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
-import accord.topology.Topology;
 
 import static accord.coordinate.ReadCoordinator.Action.Approve;
 import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
-import static accord.messages.Commit.Kind.Maximal;
-import static accord.utils.Invariants.checkArgument;
 
-class Execute extends ReadCoordinator<ReadReply>
+public class TxnExecute extends ReadCoordinator<ReadReply> implements Execute
 {
+    public static final Execute.Factory FACTORY = TxnExecute::new;
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = 
LoggerFactory.getLogger(TxnExecute.class);
+
     final Txn txn;
     final Participants<?> readScope;
     final FullRoute<?> route;
@@ -57,7 +61,7 @@ class Execute extends ReadCoordinator<ReadReply>
     final BiConsumer<? super Result, Throwable> callback;
     private Data data;
 
-    private Execute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, 
Participants<?> readScope, Timestamp executeAt, Deps deps, BiConsumer<? super 
Result, Throwable> callback)
+    private TxnExecute(Node node, TxnId txnId, Txn txn, FullRoute<?> route, 
Participants<?> readScope, Timestamp executeAt, Deps deps, BiConsumer<? super 
Result, Throwable> callback)
     {
         super(node, node.topology().forEpoch(readScope, executeAt.epoch()), 
txnId);
         this.txn = txn;
@@ -69,28 +73,6 @@ class Execute extends ReadCoordinator<ReadReply>
         this.callback = callback;
     }
 
-    public static void execute(Node node, TxnId txnId, Txn txn, FullRoute<?> 
route, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> 
callback)
-    {
-        // Recovery calls execute and we would like execute to run BlockOnDeps 
because that will notify the agent
-        // of the local barrier
-        if (txn.kind() == Kind.SyncPoint)
-        {
-            checkArgument(txnId.equals(executeAt));
-            BlockOnDeps.blockOnDeps(node, txnId, txn, route, deps, callback);
-        }
-        else if (txn.read().keys().isEmpty())
-        {
-            Result result = txn.result(txnId, executeAt, null);
-            Persist.persist(node, txnId, route, txn, executeAt, deps, 
txn.execute(txnId, executeAt, null), result);
-            callback.accept(result, null);
-        }
-        else
-        {
-            Execute execute = new Execute(node, txnId, txn, route, 
txn.keys().toParticipants(), executeAt, deps, callback);
-            execute.start();
-        }
-    }
-
     @Override
     protected void start(Set<Id> readSet)
     {
@@ -115,7 +97,7 @@ class Execute extends ReadCoordinator<ReadReply>
         if (reply.isOk())
         {
             ReadOk ok = ((ReadOk) reply);
-            Data next = ok.data;
+            Data next = ((ReadOk) reply).data;
             if (next != null)
                 data = data == null ? next : data.merge(next);
 
@@ -131,9 +113,7 @@ class Execute extends ReadCoordinator<ReadReply>
                 return Action.Aborted;
             case NotCommitted:
                 // the replica may be missing the original commit, or the 
additional commit, so send everything
-                Topologies topology = node.topology().preciseEpochs(route, 
txnId.epoch(), executeAt.epoch());
-                Topology coordinateTopology = topology.forEpoch(txnId.epoch());
-                node.send(from, new Commit(Maximal, from, coordinateTopology, 
topology, txnId, txn, route, readScope, executeAt, deps, false));
+                Commit.commitMaximal(node, from, txn, txnId, executeAt, route, 
deps, readScope);
                 // also try sending a read command to another replica, in case 
they're ready to serve a response
                 return Action.TryAlternative;
             case Invalid:
@@ -142,16 +122,13 @@ class Execute extends ReadCoordinator<ReadReply>
         }
     }
 
-
     @Override
     protected void onDone(Success success, Throwable failure)
     {
         if (failure == null)
         {
             Result result = txn.result(txnId, executeAt, data);
-            callback.accept(result, null);
-            // avoid re-calculating topologies if it is unchanged
-            Persist.persist(node, executes, txnId, route, txn, executeAt, 
deps, txn.execute(txnId, executeAt, data), result);
+            Persist.persist(node, executes, txnId, route, txn, executeAt, 
deps, txn.execute(txnId, executeAt, data), result, callback);
         }
         else
         {
diff --git a/accord-core/src/main/java/accord/coordinate/TxnPersist.java 
b/accord-core/src/main/java/accord/coordinate/TxnPersist.java
new file mode 100644
index 00000000..28119cf1
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/TxnPersist.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.function.BiConsumer;
+
+import accord.api.Result;
+import accord.local.Node;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.topology.Topologies;
+
+public class TxnPersist extends Persist
+{
+    public static final Persist.Factory FACTORY = TxnPersist::new;
+
+    private TxnPersist(Node node, Topologies topologies, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result)
+    {
+        super(node, topologies, txnId, route, txn, executeAt, deps, writes, 
result);
+    }
+
+    @Override
+    public void registerClientCallback(Writes writes, Result result, 
BiConsumer<? super Result, Throwable> clientCallback)
+    {
+        if (clientCallback != null)
+            clientCallback.accept(result, null);
+    }
+}
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/AppliedTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/AppliedTracker.java
index 159f2b88..da49dc01 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/AppliedTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AppliedTracker.java
@@ -26,7 +26,7 @@ import static 
accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
 import static 
accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange;
 import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
 
-public class AppliedTracker extends 
AbstractTracker<AppliedTracker.AppliedShardTracker>
+public class AppliedTracker extends 
AbstractTracker<AppliedTracker.AppliedShardTracker> implements ResponseTracker
 {
     public static class AppliedShardTracker extends ShardTracker
     {
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
index b20c0a70..c771406d 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -24,7 +24,7 @@ import accord.topology.Topologies;
 
 import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
 
-public class QuorumTracker extends 
AbstractTracker<QuorumTracker.QuorumShardTracker>
+public class QuorumTracker extends 
AbstractTracker<QuorumTracker.QuorumShardTracker> implements ResponseTracker
 {
     public static class QuorumShardTracker extends ShardTracker
     {
diff --git a/accord-core/src/main/java/accord/api/Data.java 
b/accord-core/src/main/java/accord/coordinate/tracking/ResponseTracker.java
similarity index 70%
copy from accord-core/src/main/java/accord/api/Data.java
copy to 
accord-core/src/main/java/accord/coordinate/tracking/ResponseTracker.java
index 24945648..5b2a5f5d 100644
--- a/accord-core/src/main/java/accord/api/Data.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ResponseTracker.java
@@ -16,16 +16,12 @@
  * limitations under the License.
  */
 
-package accord.api;
+package accord.coordinate.tracking;
 
-/**
- * The result of some (potentially partial) {@link Read} from some {@link 
DataStore}
- */
-public interface Data
+import accord.local.Node;
+
+public interface ResponseTracker
 {
-    /**
-     * Combine the contents of the parameter with this object and return the 
resultant object.
-     * This method may modify the current object and return itself.
-     */
-    Data merge(Data data);
+    RequestStatus recordSuccess(Node.Id node);
+    RequestStatus recordFailure(Node.Id node);
 }
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 6492ddf5..b7115b1b 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -39,9 +39,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
-import accord.api.ConfigurationService;
-
 import accord.api.BarrierType;
+import accord.api.ConfigurationService;
 import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
 import accord.api.Key;
@@ -51,11 +50,15 @@ import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.api.Scheduler;
 import accord.api.TopologySorter;
+import accord.coordinate.Barrier;
 import accord.config.LocalConfig;
 import accord.coordinate.CoordinateTransaction;
+import accord.coordinate.Execute;
 import accord.coordinate.MaybeRecover;
 import accord.coordinate.Outcome;
+import accord.coordinate.Persist;
 import accord.coordinate.RecoverWithRoute;
+import accord.messages.Apply;
 import accord.messages.Callback;
 import accord.messages.LocalMessage;
 import accord.messages.Reply;
@@ -68,7 +71,6 @@ import accord.primitives.FullRoute;
 import accord.primitives.ProgressToken;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
-import accord.coordinate.Barrier;
 import accord.primitives.Routable.Domain;
 import accord.primitives.Routables;
 import accord.primitives.Route;
@@ -143,6 +145,9 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
     private final ConfigurationService configService;
     private final TopologyManager topology;
     private final CommandStores commandStores;
+    private final Execute.Factory executionFactory;
+    private final Persist.Factory persistFactory;
+    private final Apply.Factory applyFactory;
 
     private final LongSupplier nowSupplier;
     private final ToLongFunction<TimeUnit> nowTimeUnit;
@@ -160,13 +165,17 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
     public Node(Id id, MessageSink messageSink, LocalMessage.Handler 
localMessageHandler,
                 ConfigurationService configService, LongSupplier nowSupplier, 
ToLongFunction<TimeUnit> nowTimeUnit,
                 Supplier<DataStore> dataSupplier, ShardDistributor 
shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, 
TopologySorter.Supplier topologySorter,
-                Function<Node, ProgressLog.Factory> progressLogFactory, 
CommandStores.Factory factory, LocalConfig localConfig)
+                Function<Node, ProgressLog.Factory> progressLogFactory, 
CommandStores.Factory factory, Execute.Factory executionFactory, 
Persist.Factory persistFactory, Apply.Factory applyFactory,
+                LocalConfig localConfig)
     {
         this.id = id;
         this.localConfig = localConfig;
         this.messageSink = messageSink;
         this.localMessageHandler = localMessageHandler;
         this.configService = configService;
+        this.executionFactory = executionFactory;
+        this.persistFactory = persistFactory;
+        this.applyFactory = applyFactory;
         this.topology = new TopologyManager(topologySorter, id);
         this.nowSupplier = nowSupplier;
         this.nowTimeUnit = nowTimeUnit;
@@ -676,11 +685,6 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
     }
 
     public void receive (Request request, Id from, ReplyContext replyContext)
-    {
-        receive(request, from, replyContext, 0);
-    }
-
-    public void receive (Request request, Id from, ReplyContext replyContext, 
long delayNanos)
     {
         long knownEpoch = request.knownEpoch();
         if (knownEpoch > topology.epoch())
@@ -703,10 +707,22 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
                 reply(from, replyContext, null, t);
             }
         };
-        if (delayNanos > 0)
-            scheduler.once(processMsg, delayNanos, TimeUnit.NANOSECONDS);
-        else
-            scheduler.now(processMsg);
+        scheduler.now(processMsg);
+    }
+
+    public Execute.Factory executionFactory()
+    {
+        return executionFactory;
+    }
+
+    public Persist.Factory persistFactory()
+    {
+        return persistFactory;
+    }
+
+    public Apply.Factory applyFactory()
+    {
+        return applyFactory;
     }
 
     public Scheduler scheduler()
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java 
b/accord-core/src/main/java/accord/local/SerializerSupport.java
index a42888bd..abdd7f19 100644
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -17,20 +17,28 @@
  */
 package accord.local;
 
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
 import accord.api.Result;
 import accord.api.VisibleForImplementation;
 import accord.local.Command.WaitingOn;
 import accord.local.CommonAttributes.Mutable;
-import accord.messages.*;
+import accord.messages.Accept;
+import accord.messages.Apply;
+import accord.messages.ApplyThenWaitUntilApplied;
+import accord.messages.BeginRecovery;
+import accord.messages.Commit;
+import accord.messages.MessageType;
+import accord.messages.PreAccept;
+import accord.messages.Propagate;
 import accord.primitives.Ballot;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Timestamp;
 import accord.primitives.Writes;
 
-import java.util.EnumSet;
-import java.util.Set;
-
 import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
 import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
 import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
@@ -43,8 +51,6 @@ import static 
accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
 import static accord.primitives.PartialTxn.merge;
 import static accord.utils.Invariants.checkState;
 
-import static java.util.EnumSet.of;
-
 @VisibleForImplementation
 public class SerializerSupport
 {
@@ -77,8 +83,8 @@ public class SerializerSupport
         }
     }
 
-    private static final EnumSet<MessageType> PRE_ACCEPT_TYPES =
-        of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG);
+    private static final Set<MessageType> PRE_ACCEPT_TYPES =
+        ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG);
 
     private static Command.PreAccepted preAccepted(Mutable attrs, Timestamp 
executeAt, Ballot promised, MessageProvider messageProvider)
     {
@@ -106,8 +112,8 @@ public class SerializerSupport
         return Command.Accepted.accepted(attrs, status, executeAt, promised, 
accepted);
     }
 
-    private static final EnumSet<MessageType> PRE_ACCEPT_COMMIT_TYPES =
-        of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG,
+    private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES =
+        ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG,
            COMMIT_MINIMAL_REQ, COMMIT_MAXIMAL_REQ, PROPAGATE_COMMIT_MSG);
 
     private static Command.Committed committed(Mutable attrs, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted, 
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
@@ -143,8 +149,8 @@ public class SerializerSupport
         return Command.Committed.committed(attrs, status, executeAt, promised, 
accepted, waitingOnProvider.provide(deps));
     }
 
-    private static final EnumSet<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
-        of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG,
+    private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
+        ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG,
            COMMIT_MINIMAL_REQ, COMMIT_MAXIMAL_REQ, PROPAGATE_COMMIT_MSG,
            APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG);
 
@@ -175,19 +181,39 @@ public class SerializerSupport
         }
         else
         {
-            checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+            boolean haveApplyMinimal = witnessed.contains(APPLY_MINIMAL_REQ);
+            boolean haveCommitMaximal = witnessed.contains(COMMIT_MAXIMAL_REQ);
 
-            Apply apply = messageProvider.applyMinimal();
-            writes = apply.writes;
-            result = apply.result;
+            Apply apply = null;
+            Commit commit = null;
+            String errorMessage = "Must have either an APPLY_MINIMAL_REQ or a 
COMMIT_MAXIMAL_REQ containing ApplyThenWaitUntilApplied";
+            if (haveApplyMinimal)
+            {
+                apply = messageProvider.applyMinimal();
+                writes = apply.writes;
+                result = apply.result;
+            }
+            else if (haveCommitMaximal)
+            {
+                commit = messageProvider.commitMaximal();
+                checkState(commit.readData instanceof 
ApplyThenWaitUntilApplied, errorMessage);
+                ApplyThenWaitUntilApplied applyThenWaitUntilApplied = 
(ApplyThenWaitUntilApplied)commit.readData;
+                writes = applyThenWaitUntilApplied.writes;
+                result = applyThenWaitUntilApplied.txnResult;
+            }
+            else
+            {
+                throw new IllegalStateException(errorMessage);
+            }
 
             /*
              * NOTE: If Commit has been witnessed, we'll extract deps from 
there;
              * Apply has an expected TO-DO to stop including deps in such case.
              */
-            if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+            if (haveCommitMaximal)
             {
-                Commit commit = messageProvider.commitMaximal();
+                if (commit == null)
+                    commit = messageProvider.commitMaximal();
                 txn = commit.partialTxn;
                 deps = commit.partialDeps;
             }
@@ -199,7 +225,7 @@ public class SerializerSupport
             }
             else if (witnessed.contains(COMMIT_MINIMAL_REQ))
             {
-                Commit commit = messageProvider.commitMinimal();
+                commit = messageProvider.commitMinimal();
                 txn = merge(apply.txn, merge(commit.partialTxn, 
txnFromPreAcceptOrBeginRecover(witnessed, messageProvider)));
                 deps = commit.partialDeps;
             }
@@ -216,8 +242,8 @@ public class SerializerSupport
         return Command.Executed.executed(attrs, status, executeAt, promised, 
accepted, waitingOnProvider.provide(deps), writes, result);
     }
 
-    private static final EnumSet<MessageType> APPLY_TYPES =
-            of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG);
+    private static final Set<MessageType> APPLY_TYPES =
+            ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, 
PROPAGATE_APPLY_MSG);
 
     private static Command.Truncated truncated(Mutable attrs, SaveStatus 
status, Timestamp executeAt, MessageProvider messageProvider)
     {
diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java 
b/accord-core/src/main/java/accord/messages/AbstractExecute.java
similarity index 71%
copy from accord-core/src/main/java/accord/messages/ReadTxnData.java
copy to accord-core/src/main/java/accord/messages/AbstractExecute.java
index 5dec9ba5..804c101a 100644
--- a/accord-core/src/main/java/accord/messages/ReadTxnData.java
+++ b/accord-core/src/main/java/accord/messages/AbstractExecute.java
@@ -18,6 +18,8 @@
 
 package accord.messages;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +37,6 @@ import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
-import javax.annotation.Nullable;
 
 import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
 import static accord.local.Status.Committed;
@@ -43,30 +44,18 @@ import static 
accord.messages.ReadData.ReadNack.NotCommitted;
 import static accord.messages.ReadData.ReadNack.Redundant;
 import static accord.utils.MapReduceConsume.forEach;
 
-// TODO (required, efficiency): dedup - can currently have infinite pending 
reads that will be executed independently
-public class ReadTxnData extends ReadData implements 
Command.TransientListener, EpochSupplier
+public abstract class AbstractExecute extends ReadData implements 
Command.TransientListener, EpochSupplier
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(ReadTxnData.class);
-
-    public static class SerializerSupport
-    {
-        public static ReadTxnData create(TxnId txnId, Participants<?> scope, 
long executeAtEpoch, long waitForEpoch)
-        {
-            return new ReadTxnData(txnId, scope, executeAtEpoch, waitForEpoch);
-        }
-    }
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractExecute.class);
 
     class ObsoleteTracker implements Command.TransientListener
     {
         @Override
         public void onChange(SafeCommandStore safeStore, SafeCommand 
safeCommand)
         {
-            switch (safeCommand.current().status())
+            switch (actionForStatus(safeCommand.current().status()))
             {
-                case PreApplied:
-                case Applied:
-                case Invalidated:
-                case Truncated:
+                case OBSOLETE:
                     obsoleteAndSend();
                     safeCommand.removeListener(this);
             }
@@ -75,7 +64,7 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
         @Override
         public PreLoadContext listenerPreLoadContext(TxnId caller)
         {
-            return ReadTxnData.this.listenerPreLoadContext(caller);
+            return AbstractExecute.this.listenerPreLoadContext(caller);
         }
     }
 
@@ -85,13 +74,13 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
     final ObsoleteTracker obsoleteTracker = new ObsoleteTracker();
     private transient State state = State.PENDING; // TODO (low priority, 
semantics): respond with the Executed result we have stored?
 
-    public ReadTxnData(Node.Id to, Topologies topologies, TxnId txnId, 
Participants<?> readScope, Timestamp executeAt)
+    public AbstractExecute(Node.Id to, Topologies topologies, TxnId txnId, 
Participants<?> readScope, Timestamp executeAt)
     {
         super(to, topologies, txnId, readScope);
         this.executeAtEpoch = executeAt.epoch();
     }
 
-    protected ReadTxnData(TxnId txnId, Participants<?> readScope, long 
executeAtEpoch, long waitForEpoch)
+    public AbstractExecute(TxnId txnId, Participants<?> readScope, long 
waitForEpoch, long executeAtEpoch)
     {
         super(txnId, readScope, waitForEpoch);
         this.executeAtEpoch = executeAtEpoch;
@@ -121,13 +110,34 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
         return PreLoadContext.contextFor(txnId, caller, keys());
     }
 
-    @Override
-    public synchronized void onChange(SafeCommandStore safeStore, SafeCommand 
safeCommand)
+    protected enum Action { WAIT, EXECUTE, OBSOLETE }
+
+    protected abstract boolean canExecutePreApplied();
+
+    /*
+     * Reading data definitely requires respecting obsoletion since it 
invalidates the read
+     * but writing data doesn't always make it necessary to fail the 
transaction due to preemption.
+     * At worst we do some duplicate work, and ignoring obsoletion means we 
don't have to fail the transaction at the
+     * original coordinator.
+     */
+    protected boolean executeIfObsoleted()
     {
-        Command command = safeCommand.current();
-        logger.trace("{}: updating as listener in response to change on {} 
with status {} ({})",
-                this, command.txnId(), command.status(), command);
-        switch (command.status())
+        return false;
+    }
+
+    // TODO (review): Is this too liberal in allowing old things to execute?
+    // would it be better to let things fail if coordinators compete?
+    Action maybeObsoleteOrExecute(Action action, Status status)
+    {
+        if (action == Action.OBSOLETE && executeIfObsoleted())
+            // Just because it isn't obsolete doesn't mean it is ready to 
execute
+            return status.hasBeen(Status.ReadyToExecute) ? Action.EXECUTE : 
Action.WAIT;
+        return action;
+    }
+
+    protected Action actionForStatus(Status status)
+    {
+        switch (status)
         {
             default: throw new AssertionError();
             case NotDefined:
@@ -136,15 +146,38 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
             case AcceptedInvalidate:
             case PreCommitted:
             case Committed:
-                return;
+                return Action.WAIT;
 
             case PreApplied:
+                return canExecutePreApplied() ? Action.EXECUTE : 
maybeObsoleteOrExecute(Action.OBSOLETE, status);
+
+            case ReadyToExecute:
+                return Action.EXECUTE;
+
             case Applied:
+                return maybeObsoleteOrExecute(Action.OBSOLETE, status);
             case Invalidated:
             case Truncated:
+                return Action.OBSOLETE;
+        }
+    }
+
+    @Override
+    public synchronized void onChange(SafeCommandStore safeStore, SafeCommand 
safeCommand)
+    {
+        Command command = safeCommand.current();
+        logger.trace("{}: updating as listener in response to change on {} 
with status {} ({})",
+                     this, command.txnId(), command.status(), command);
+
+        switch (actionForStatus(command.status()))
+        {
+            default: throw new AssertionError();
+            case WAIT:
+                return;
+            case OBSOLETE:
                 obsoleteAndSend();
                 return;
-            case ReadyToExecute:
+            case EXECUTE:
         }
 
         if (safeCommand.removeListener(this))
@@ -167,15 +200,10 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
         Status status = command.status();
 
         logger.trace("{}: setting up read with status {} on {}", txnId, 
status, safeStore);
-        switch (status) {
-            default:
-                throw new AssertionError();
-            case Committed:
-            case NotDefined:
-            case PreAccepted:
-            case Accepted:
-            case AcceptedInvalidate:
-            case PreCommitted:
+        switch (actionForStatus(status))
+        {
+            default: throw new AssertionError();
+            case WAIT:
                 waitingOn.set(safeStore.commandStore().id());
                 ++waitingOnCount;
                 safeCommand.addListener(this);
@@ -183,15 +211,10 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
                 safeStore.progressLog().waiting(safeCommand, WaitingToExecute, 
null, readScope);
                 if (status == Committed) return null;
                 else return NotCommitted;
-
-            case PreApplied:
-            case Applied:
-            case Invalidated:
-            case Truncated:
+            case OBSOLETE:
                 state = State.OBSOLETE;
                 return Redundant;
-
-            case ReadyToExecute:
+            case EXECUTE:
                 waitingOn.set(safeStore.commandStore().id());
                 ++waitingOnCount;
                 maybeRead(safeStore, safeCommand);
@@ -231,7 +254,8 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
     @Override
     protected synchronized void readComplete(CommandStore commandStore, 
@Nullable Data result, @Nullable Ranges unavailable)
     {
-        // TODO (expected): we should unregister our listener, but this is 
quite costly today
+        // TODO (expected): lots of undesirable costs associated with the 
obsoletion tracker
+//        commandStore.execute(contextFor(txnId), safeStore -> 
safeStore.command(txnId).removeListener(obsoleteTracker));
         super.readComplete(commandStore, result, unavailable);
     }
 
@@ -241,42 +265,40 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
         switch (state)
         {
             case RETURNED:
-                throw new IllegalStateException("ReadOk was sent, yet ack 
called again", fail);
+                throw new IllegalStateException("ReadOk was sent, yet ack 
called again");
             case OBSOLETE:
                 logger.debug("After the read completed for txn {}, the result 
was marked obsolete", txnId);
-                if (fail != null)
-                    node.agent().onUncaughtException(fail);
                 break;
             case PENDING:
                 state = State.RETURNED;
-                node.reply(replyTo, replyContext, fail == null ? new 
ReadOk(unavailable, data) : null, fail);
+                node.reply(replyTo, replyContext, fail == null ? 
constructReadOk(unavailable, data) : null, fail);
                 break;
             default:
                 throw new AssertionError("Unknown state: " + state);
         }
     }
 
-    private void removeListener(SafeCommandStore safeStore, TxnId txnId)
+    protected ReadOk constructReadOk(Ranges unavailable, Data data)
     {
-        safeStore.get(txnId, this, readScope).removeListener(this);
+        return new ReadOk(unavailable, data);
     }
 
-    @Override
-    protected void cancel()
+    private void removeListener(SafeCommandStore safeStore, TxnId txnId)
     {
-        node.commandStores().mapReduceConsume(this, waitingOn.stream(), 
forEach(in -> removeListener(in, txnId), node.agent()));
+        SafeCommand safeCommand = safeStore.ifInitialised(txnId);
+        safeCommand.removeListener(this);
     }
 
     @Override
-    public MessageType type()
+    protected void cancel()
     {
-        return MessageType.READ_REQ;
+        node.commandStores().mapReduceConsume(this, waitingOn.stream(), 
forEach(in -> removeListener(in, txnId), node.agent()));
     }
 
     @Override
     public String toString()
     {
-        return "ReadData{" +
+        return "ReadTxnData{" +
                "txnId:" + txnId +
                '}';
     }
diff --git a/accord-core/src/main/java/accord/messages/Apply.java 
b/accord-core/src/main/java/accord/messages/Apply.java
index 3947bff4..a2e874be 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -45,6 +45,7 @@ import accord.topology.Topologies;
 
 public class Apply extends TxnRequest<ApplyReply>
 {
+    public static final Factory FACTORY = Apply::new;
     public static class SerializationSupport
     {
         public static Apply create(TxnId txnId, PartialRoute<?> scope, long 
waitForEpoch, Kind kind, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps 
deps, PartialTxn txn, Writes writes, Result result)
@@ -53,6 +54,11 @@ public class Apply extends TxnRequest<ApplyReply>
         }
     }
 
+    public interface Factory
+    {
+        Apply create(Kind kind, Id to, Topologies participates, Topologies 
executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result);
+    }
+
     public final Kind kind;
     public final Timestamp executeAt;
     public final Seekables<?, ?> keys;
@@ -63,7 +69,7 @@ public class Apply extends TxnRequest<ApplyReply>
 
     public enum Kind { Minimal, Maximal }
 
-    private Apply(Kind kind, Id to, Topologies participates, Topologies 
executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result)
+    protected Apply(Kind kind, Id to, Topologies participates, Topologies 
executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result)
     {
         super(to, participates, route, txnId);
         Ranges slice = kind == Kind.Maximal || executes == participates ? 
scope.covering() : executes.computeRangesForNode(to);
@@ -82,14 +88,14 @@ public class Apply extends TxnRequest<ApplyReply>
     {
         Topologies executes = executes(node, route, executeAt);
         Topologies participates = participates(node, route, txnId, executeAt, 
executes);
-        node.send(participates.nodes(), to -> applyMaximal(to, participates, 
executes, txnId, route, txn, executeAt, deps, writes, result));
+        node.send(participates.nodes(), to -> applyMaximal(FACTORY, to, 
participates, executes, txnId, route, txn, executeAt, deps, writes, result));
     }
 
     public static void sendMaximal(Node node, Id to, TxnId txnId, Route<?> 
route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
     {
         Topologies executes = executes(node, route, executeAt);
         Topologies participates = participates(node, route, txnId, executeAt, 
executes);
-        node.send(to, applyMaximal(to, participates, executes, txnId, route, 
txn, executeAt, deps, writes, result));
+        node.send(to, applyMaximal(FACTORY, to, participates, executes, txnId, 
route, txn, executeAt, deps, writes, result));
     }
 
     public static Topologies executes(Node node, Unseekables<?> route, 
Timestamp executeAt)
@@ -102,17 +108,17 @@ public class Apply extends TxnRequest<ApplyReply>
         return txnId.epoch() == executeAt.epoch() ? executes : 
node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
     }
 
-    public static Apply applyMinimal(Id to, Topologies sendTo, Topologies 
applyTo, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result)
+    public static Apply applyMinimal(Factory factory, Id to, Topologies 
sendTo, Topologies applyTo, TxnId txnId, Route<?> route, Txn txn, Timestamp 
executeAt, Deps deps, Writes writes, Result result)
     {
-        return new Apply(Kind.Minimal, to, sendTo, applyTo, txnId, route, txn, 
executeAt, deps, writes, result);
+        return factory.create(Kind.Minimal, to, sendTo, applyTo, txnId, route, 
txn, executeAt, deps, writes, result);
     }
 
-    public static Apply applyMaximal(Id to, Topologies participates, 
Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, 
Deps deps, Writes writes, Result result)
+    public static Apply applyMaximal(Factory factory, Id to, Topologies 
participates, Topologies executes, TxnId txnId, Route<?> route, Txn txn, 
Timestamp executeAt, Deps deps, Writes writes, Result result)
     {
-        return new Apply(Kind.Maximal, to, participates, executes, txnId, 
route, txn, executeAt, deps, writes, result);
+        return factory.create(Kind.Maximal, to, participates, executes, txnId, 
route, txn, executeAt, deps, writes, result);
     }
 
-    private Apply(Kind kind, TxnId txnId, PartialRoute<?> route, long 
waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, 
@Nullable PartialTxn txn, Writes writes, Result result)
+    protected Apply(Kind kind, TxnId txnId, PartialRoute<?> route, long 
waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, 
@Nullable PartialTxn txn, Writes writes, Result result)
     {
         super(txnId, route, waitForEpoch);
         this.kind = kind;
@@ -141,6 +147,11 @@ public class Apply extends TxnRequest<ApplyReply>
     public static ApplyReply apply(SafeCommandStore safeStore, PartialTxn txn, 
TxnId txnId, Timestamp executeAt, PartialDeps deps, PartialRoute<?> scope, 
Writes writes, Result result, RoutingKey progressKey)
     {
         SafeCommand safeCommand = safeStore.get(txnId, executeAt, scope);
+        return apply(safeStore, safeCommand, txn, txnId, executeAt, deps, 
scope, writes, result, progressKey);
+    }
+
+    public static ApplyReply apply(SafeCommandStore safeStore, SafeCommand 
safeCommand, PartialTxn txn, TxnId txnId, Timestamp executeAt, PartialDeps 
deps, PartialRoute<?> scope, Writes writes, Result result, RoutingKey 
progressKey)
+    {
         switch (Commands.apply(safeStore, safeCommand, txnId, scope, 
progressKey, executeAt, deps, txn, writes, result))
         {
             default:
diff --git 
a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
index 79a5b35d..02b4fc19 100644
--- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -38,6 +38,8 @@ import accord.primitives.Writes;
 /*
  * Used by local and global inclusive sync points to effect the sync point at 
each node
  * Combines commit, execute (with nothing really to execute), and apply into 
one request/response
+ *
+ * This returns when the dependencies are Applied, but doesn't wait for this 
transaction to be Applied.
  */
 public class ApplyThenWaitUntilApplied extends WaitUntilApplied
 {
diff --git a/accord-core/src/main/java/accord/messages/Commit.java 
b/accord-core/src/main/java/accord/messages/Commit.java
index 2e30e055..a1b0404b 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -50,6 +50,7 @@ import accord.topology.Topology;
 import accord.utils.Invariants;
 import accord.utils.TriFunction;
 
+import static accord.messages.Commit.Kind.Maximal;
 import static accord.utils.Invariants.checkArgument;
 
 public class Commit extends TxnRequest<ReadNack>
@@ -116,7 +117,7 @@ public class Commit extends TxnRequest<ReadNack>
         this.readData = toExecuteFactory.apply(partialTxn != null ? partialTxn 
: txn, scope, partialDeps);
     }
 
-    Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, 
Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, 
@Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
+    protected Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long 
waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps 
partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadData readData)
     {
         super(txnId, scope, waitForEpoch);
         this.kind = kind;
@@ -173,6 +174,14 @@ public class Commit extends TxnRequest<ReadNack>
         }
     }
 
+    public static void commitMaximal(Node node, Node.Id to, Txn txn, TxnId 
txnId, Timestamp executeAt, FullRoute<?> route, Deps deps, Participants<?> 
readScope)
+    {
+        // the replica may be missing the original commit, or the additional 
commit, so send everything
+        Topologies topology = node.topology().preciseEpochs(route, 
txnId.epoch(), executeAt.epoch());
+        Topology coordinateTopology = topology.forEpoch(txnId.epoch());
+        node.send(to, new Commit(Maximal, to, coordinateTopology, topology, 
txnId, txn, route, readScope, executeAt, deps, false));
+    }
+
     @Override
     public TxnId primaryTxnId()
     {
diff --git a/accord-core/src/main/java/accord/messages/InformHomeDurable.java 
b/accord-core/src/main/java/accord/messages/InformHomeDurable.java
index 36d1a242..6f2491db 100644
--- a/accord-core/src/main/java/accord/messages/InformHomeDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformHomeDurable.java
@@ -20,6 +20,8 @@ package accord.messages;
 
 import java.util.Set;
 
+import com.google.common.collect.ImmutableSet;
+
 import accord.local.Commands;
 import accord.local.Node;
 import accord.local.Node.Id;
@@ -48,7 +50,7 @@ public class InformHomeDurable implements Request
         this.route = route;
         this.executeAt = executeAt;
         this.durability = durability;
-        this.persistedOn = persistedOn;
+        this.persistedOn = ImmutableSet.copyOf(persistedOn); // Persisted on 
might be mutated later
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java 
b/accord-core/src/main/java/accord/messages/MessageType.java
index 42f1ad48..a3fe592a 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -17,59 +17,92 @@
  */
 package accord.messages;
 
-import static accord.messages.MessageType.Kind.LOCAL;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
 import static accord.messages.MessageType.Kind.REMOTE;
+import static accord.messages.MessageType.Kind.LOCAL;
 
 /**
  * Meant to assist implementations with mapping accord messages to their own 
messaging systems.
  */
-public enum MessageType
+public class MessageType
 {
-    SIMPLE_RSP                         (REMOTE, false),
-    FAILURE_RSP                        (REMOTE, false),
-    PRE_ACCEPT_REQ                     (REMOTE, true ),
-    PRE_ACCEPT_RSP                     (REMOTE, false),
-    ACCEPT_REQ                         (REMOTE, true ),
-    ACCEPT_RSP                         (REMOTE, false),
-    ACCEPT_INVALIDATE_REQ              (REMOTE, true ),
-    GET_DEPS_REQ                       (REMOTE, false),
-    GET_DEPS_RSP                       (REMOTE, false),
-    COMMIT_MINIMAL_REQ                 (REMOTE, true ),
-    COMMIT_MAXIMAL_REQ                 (REMOTE, true ),
-    COMMIT_INVALIDATE_REQ              (REMOTE, true ),
-    APPLY_MINIMAL_REQ                  (REMOTE, true ),
-    APPLY_MAXIMAL_REQ                  (REMOTE, true ),
-    APPLY_RSP                          (REMOTE, false),
-    READ_REQ                           (REMOTE, false),
-    READ_RSP                           (REMOTE, false),
-    BEGIN_RECOVER_REQ                  (REMOTE, true ),
-    BEGIN_RECOVER_RSP                  (REMOTE, false),
-    BEGIN_INVALIDATE_REQ               (REMOTE, true ),
-    BEGIN_INVALIDATE_RSP               (REMOTE, false),
-    WAIT_ON_COMMIT_REQ                 (REMOTE, false),
-    WAIT_ON_COMMIT_RSP                 (REMOTE, false),
-    WAIT_UNTIL_APPLIED_REQ             (REMOTE, false),
-    INFORM_OF_TXN_REQ                  (REMOTE, true ),
-    INFORM_DURABLE_REQ                 (REMOTE, true ),
-    INFORM_HOME_DURABLE_REQ            (REMOTE, true ),
-    CHECK_STATUS_REQ                   (REMOTE, false),
-    CHECK_STATUS_RSP                   (REMOTE, false),
-    FETCH_DATA_REQ                     (REMOTE, false),
-    FETCH_DATA_RSP                     (REMOTE, false),
-    SET_SHARD_DURABLE_REQ              (REMOTE, true ),
-    SET_GLOBALLY_DURABLE_REQ           (REMOTE, true ),
-    QUERY_DURABLE_BEFORE_REQ           (REMOTE, false),
-    QUERY_DURABLE_BEFORE_RSP           (REMOTE, false),
-    APPLY_AND_WAIT_UNTIL_APPLIED_REQ   (REMOTE, true ),
-
-    PROPAGATE_PRE_ACCEPT_MSG           (LOCAL,  true ),
-    PROPAGATE_COMMIT_MSG               (LOCAL,  true ),
-    PROPAGATE_APPLY_MSG                (LOCAL,  true ),
-    PROPAGATE_OTHER_MSG                (LOCAL,  true ),
-    ;
+    public static final MessageType SIMPLE_RSP                       = 
mt(REMOTE, false);
+    public static final MessageType FAILURE_RSP                      = 
mt(REMOTE, false);
+    public static final MessageType PRE_ACCEPT_REQ                   = 
mt(REMOTE, true );
+    public static final MessageType PRE_ACCEPT_RSP                   = 
mt(REMOTE, false);
+    public static final MessageType ACCEPT_REQ                       = 
mt(REMOTE, true );
+    public static final MessageType ACCEPT_RSP                       = 
mt(REMOTE, false);
+    public static final MessageType ACCEPT_INVALIDATE_REQ            = 
mt(REMOTE, true );
+    public static final MessageType GET_DEPS_REQ                     = 
mt(REMOTE, false);
+    public static final MessageType GET_DEPS_RSP                     = 
mt(REMOTE, false);
+    public static final MessageType COMMIT_MINIMAL_REQ               = 
mt(REMOTE, true );
+    public static final MessageType COMMIT_MAXIMAL_REQ               = 
mt(REMOTE, true );
+    public static final MessageType COMMIT_INVALIDATE_REQ            = 
mt(REMOTE, true );
+    public static final MessageType APPLY_MINIMAL_REQ                = 
mt(REMOTE, true );
+    public static final MessageType APPLY_MAXIMAL_REQ                = 
mt(REMOTE, true );
+    public static final MessageType APPLY_RSP                        = 
mt(REMOTE, false);
+    public static final MessageType READ_REQ                         = 
mt(REMOTE, false);
+    public static final MessageType READ_RSP                         = 
mt(REMOTE, false);
+    public static final MessageType BEGIN_RECOVER_REQ                = 
mt(REMOTE, true );
+    public static final MessageType BEGIN_RECOVER_RSP                = 
mt(REMOTE, false);
+    public static final MessageType BEGIN_INVALIDATE_REQ             = 
mt(REMOTE, true );
+    public static final MessageType BEGIN_INVALIDATE_RSP             = 
mt(REMOTE, false);
+    public static final MessageType WAIT_ON_COMMIT_REQ               = 
mt(REMOTE, false);
+    public static final MessageType WAIT_ON_COMMIT_RSP               = 
mt(REMOTE, false);
+    public static final MessageType WAIT_UNTIL_APPLIED_REQ           = 
mt(REMOTE, false);
+    public static final MessageType INFORM_OF_TXN_REQ                = 
mt(REMOTE, true );
+    public static final MessageType INFORM_DURABLE_REQ               = 
mt(REMOTE, true );
+    public static final MessageType INFORM_HOME_DURABLE_REQ          = 
mt(REMOTE, true );
+    public static final MessageType CHECK_STATUS_REQ                 = 
mt(REMOTE, false);
+    public static final MessageType CHECK_STATUS_RSP                 = 
mt(REMOTE, false);
+    public static final MessageType FETCH_DATA_REQ                   = 
mt(REMOTE, false);
+    public static final MessageType FETCH_DATA_RSP                   = 
mt(REMOTE, false);
+    public static final MessageType SET_SHARD_DURABLE_REQ            = 
mt(REMOTE, true );
+    public static final MessageType SET_GLOBALLY_DURABLE_REQ         = 
mt(REMOTE, true );
+    public static final MessageType QUERY_DURABLE_BEFORE_REQ         = 
mt(REMOTE, false);
+    public static final MessageType QUERY_DURABLE_BEFORE_RSP         = 
mt(REMOTE, false);
+    public static final MessageType APPLY_AND_WAIT_UNTIL_APPLIED_REQ = 
mt(REMOTE, true );
+
+    public static final MessageType PROPAGATE_PRE_ACCEPT_MSG         = 
mt(LOCAL,  true );
+    public static final MessageType PROPAGATE_COMMIT_MSG             = 
mt(LOCAL,  true );
+    public static final MessageType PROPAGATE_APPLY_MSG              = 
mt(LOCAL,  true );
+    public static final MessageType PROPAGATE_OTHER_MSG              = 
mt(LOCAL,  true );
+
 
     public enum Kind { LOCAL, REMOTE }
 
+    public static final List<MessageType> values;
+
+    static
+    {
+        ImmutableList.Builder<MessageType> builder = ImmutableList.builder();
+        for (Field f : MessageType.class.getDeclaredFields())
+        {
+            if (f.getType().equals(MessageType.class) && 
Modifier.isStatic(f.getModifiers()))
+            {
+                try
+                {
+                    builder.add((MessageType) f.get(null));
+                }
+                catch (IllegalAccessException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        values = builder.build();
+    }
+
+    protected static MessageType mt(Kind kind, boolean hasSideEffects)
+    {
+        return new MessageType(kind, hasSideEffects);
+    }
+
     /**
      * LOCAL messages are not sent to remote nodes.
      */
@@ -80,7 +113,7 @@ public enum MessageType
      */
     private final boolean hasSideEffects;
 
-    MessageType(Kind kind, boolean hasSideEffects)
+    protected MessageType(Kind kind, boolean hasSideEffects)
     {
         this.hasSideEffects = hasSideEffects;
         this.kind = kind;
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index 3eacce68..889b7067 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -36,12 +36,14 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.utils.Invariants;
+import accord.utils.async.AsyncChain;
 
 import static accord.messages.MessageType.READ_RSP;
 import static accord.messages.TxnRequest.computeWaitForEpoch;
 import static accord.messages.TxnRequest.latestRelevantEpochIndex;
 
 // TODO (required, efficiency): dedup - can currently have infinite pending 
reads that will be executed independently
+// TODO (review) this is really more at its core Execute rather than read 
because we use it to execute all kinds of things now and we should maybe rename 
it?
 public abstract class ReadData extends AbstractEpochRequest<ReadNack>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ReadData.class);
@@ -56,11 +58,10 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadNack>
 
         ReadType(int val)
         {
-            this.val = (byte)val;
+            this.val = (byte) val;
         }
 
-        @SuppressWarnings("unused")
-        public static ReadType fromValue(byte val)
+        public static ReadType valueOf(int val)
         {
             switch (val)
             {
@@ -79,6 +80,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadNack>
     // TODO (expected, cleanup): should this be a Route?
     public final Participants<?> readScope;
     private final long waitForEpoch;
+
     private Data data;
     transient BitSet waitingOn;
     transient int waitingOnCount;
@@ -144,10 +146,10 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadNack>
             node.agent().onUncaughtException(failure);
             cancel();
         }
-        else
-        {
+
+        // Unless failed always ack to indicate setup has completed otherwise 
the counter never gets to -1
+        if (failure == null)
             ack(null);
-        }
     }
 
     private void ack(@Nullable Ranges newUnavailable)
@@ -180,12 +182,17 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadNack>
         ack(unavailable);
     }
 
+    protected AsyncChain<Data> execute(SafeCommandStore safeStore, Timestamp 
executeAt, PartialTxn txn)
+    {
+        return txn.read(safeStore, executeAt);
+    }
+
     void read(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn)
     {
         CommandStore unsafeStore = safeStore.commandStore();
         Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt);
 
-        txn.read(safeStore, executeAt).begin((next, throwable) -> {
+        execute(safeStore, executeAt, txn).begin((next, throwable) -> {
             if (throwable != null)
             {
                 // TODO (expected, exceptions): should send exception to 
client, and consistency handle/propagate locally
diff --git a/accord-core/src/main/java/accord/messages/ReadTxnData.java 
b/accord-core/src/main/java/accord/messages/ReadTxnData.java
index 5dec9ba5..69e7a7ba 100644
--- a/accord-core/src/main/java/accord/messages/ReadTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadTxnData.java
@@ -18,36 +18,15 @@
 
 package accord.messages;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.api.Data;
-import accord.local.Command;
-import accord.local.CommandStore;
 import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommand;
-import accord.local.SafeCommandStore;
-import accord.local.Status;
-import accord.primitives.EpochSupplier;
 import accord.primitives.Participants;
-import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
-import javax.annotation.Nullable;
-
-import static accord.local.SaveStatus.LocalExecution.WaitingToExecute;
-import static accord.local.Status.Committed;
-import static accord.messages.ReadData.ReadNack.NotCommitted;
-import static accord.messages.ReadData.ReadNack.Redundant;
-import static accord.utils.MapReduceConsume.forEach;
 
 // TODO (required, efficiency): dedup - can currently have infinite pending 
reads that will be executed independently
-public class ReadTxnData extends ReadData implements 
Command.TransientListener, EpochSupplier
+public class ReadTxnData extends AbstractExecute
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(ReadTxnData.class);
-
     public static class SerializerSupport
     {
         public static ReadTxnData create(TxnId txnId, Participants<?> scope, 
long executeAtEpoch, long waitForEpoch)
@@ -56,215 +35,19 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
         }
     }
 
-    class ObsoleteTracker implements Command.TransientListener
-    {
-        @Override
-        public void onChange(SafeCommandStore safeStore, SafeCommand 
safeCommand)
-        {
-            switch (safeCommand.current().status())
-            {
-                case PreApplied:
-                case Applied:
-                case Invalidated:
-                case Truncated:
-                    obsoleteAndSend();
-                    safeCommand.removeListener(this);
-            }
-        }
-
-        @Override
-        public PreLoadContext listenerPreLoadContext(TxnId caller)
-        {
-            return ReadTxnData.this.listenerPreLoadContext(caller);
-        }
-    }
-
-    private enum State { PENDING, RETURNED, OBSOLETE }
-
-    public final long executeAtEpoch;
-    final ObsoleteTracker obsoleteTracker = new ObsoleteTracker();
-    private transient State state = State.PENDING; // TODO (low priority, 
semantics): respond with the Executed result we have stored?
-
     public ReadTxnData(Node.Id to, Topologies topologies, TxnId txnId, 
Participants<?> readScope, Timestamp executeAt)
     {
-        super(to, topologies, txnId, readScope);
-        this.executeAtEpoch = executeAt.epoch();
-    }
-
-    protected ReadTxnData(TxnId txnId, Participants<?> readScope, long 
executeAtEpoch, long waitForEpoch)
-    {
-        super(txnId, readScope, waitForEpoch);
-        this.executeAtEpoch = executeAtEpoch;
-    }
-
-    @Override
-    public ReadType kind()
-    {
-        return ReadType.readTxnData;
-    }
-
-    @Override
-    protected long executeAtEpoch()
-    {
-        return executeAtEpoch;
-    }
-
-    @Override
-    public long epoch()
-    {
-        return executeAtEpoch;
-    }
-
-    @Override
-    public PreLoadContext listenerPreLoadContext(TxnId caller)
-    {
-        return PreLoadContext.contextFor(txnId, caller, keys());
-    }
-
-    @Override
-    public synchronized void onChange(SafeCommandStore safeStore, SafeCommand 
safeCommand)
-    {
-        Command command = safeCommand.current();
-        logger.trace("{}: updating as listener in response to change on {} 
with status {} ({})",
-                this, command.txnId(), command.status(), command);
-        switch (command.status())
-        {
-            default: throw new AssertionError();
-            case NotDefined:
-            case PreAccepted:
-            case Accepted:
-            case AcceptedInvalidate:
-            case PreCommitted:
-            case Committed:
-                return;
-
-            case PreApplied:
-            case Applied:
-            case Invalidated:
-            case Truncated:
-                obsoleteAndSend();
-                return;
-            case ReadyToExecute:
-        }
-
-        if (safeCommand.removeListener(this))
-            maybeRead(safeStore, safeCommand);
-    }
-
-    @Override
-    public synchronized ReadNack apply(SafeCommandStore safeStore)
-    {
-        SafeCommand safeCommand = safeStore.get(txnId, this, readScope);
-        return apply(safeStore, safeCommand);
-    }
-
-    protected synchronized ReadNack apply(SafeCommandStore safeStore, 
SafeCommand safeCommand)
-    {
-        if (state != State.PENDING)
-            return null;
-
-        Command command = safeCommand.current();
-        Status status = command.status();
-
-        logger.trace("{}: setting up read with status {} on {}", txnId, 
status, safeStore);
-        switch (status) {
-            default:
-                throw new AssertionError();
-            case Committed:
-            case NotDefined:
-            case PreAccepted:
-            case Accepted:
-            case AcceptedInvalidate:
-            case PreCommitted:
-                waitingOn.set(safeStore.commandStore().id());
-                ++waitingOnCount;
-                safeCommand.addListener(this);
-
-                safeStore.progressLog().waiting(safeCommand, WaitingToExecute, 
null, readScope);
-                if (status == Committed) return null;
-                else return NotCommitted;
-
-            case PreApplied:
-            case Applied:
-            case Invalidated:
-            case Truncated:
-                state = State.OBSOLETE;
-                return Redundant;
-
-            case ReadyToExecute:
-                waitingOn.set(safeStore.commandStore().id());
-                ++waitingOnCount;
-                maybeRead(safeStore, safeCommand);
-                return null;
-        }
+        super(to, topologies, txnId, readScope, executeAt);
     }
 
-    synchronized void obsoleteAndSend()
+    public ReadTxnData(TxnId txnId, Participants<?> readScope, long 
waitForEpoch, long executeAtEpoch)
     {
-        if (state == State.PENDING)
-        {
-            state = State.OBSOLETE;
-            node.reply(replyTo, replyContext, Redundant, null);
-        }
+        super(txnId, readScope, waitForEpoch, executeAtEpoch);
     }
 
-    void maybeRead(SafeCommandStore safeStore, SafeCommand safeCommand)
+    protected boolean canExecutePreApplied()
     {
-        switch (state)
-        {
-            case PENDING:
-                Command command = safeCommand.current();
-                logger.trace("{}: executing read", command.txnId());
-                safeCommand.addListener(obsoleteTracker);
-                read(safeStore, command.executeAt(), command.partialTxn());
-                break;
-            case OBSOLETE:
-                // nothing to see here
-                break;
-            case RETURNED:
-                throw new IllegalStateException("ReadOk was sent, yet ack 
called again");
-            default:
-                throw new AssertionError("Unknown state: " + state);
-        }
-    }
-
-    @Override
-    protected synchronized void readComplete(CommandStore commandStore, 
@Nullable Data result, @Nullable Ranges unavailable)
-    {
-        // TODO (expected): we should unregister our listener, but this is 
quite costly today
-        super.readComplete(commandStore, result, unavailable);
-    }
-
-    @Override
-    protected void reply(@Nullable Ranges unavailable, @Nullable Data data, 
@Nullable Throwable fail)
-    {
-        switch (state)
-        {
-            case RETURNED:
-                throw new IllegalStateException("ReadOk was sent, yet ack 
called again", fail);
-            case OBSOLETE:
-                logger.debug("After the read completed for txn {}, the result 
was marked obsolete", txnId);
-                if (fail != null)
-                    node.agent().onUncaughtException(fail);
-                break;
-            case PENDING:
-                state = State.RETURNED;
-                node.reply(replyTo, replyContext, fail == null ? new 
ReadOk(unavailable, data) : null, fail);
-                break;
-            default:
-                throw new AssertionError("Unknown state: " + state);
-        }
-    }
-
-    private void removeListener(SafeCommandStore safeStore, TxnId txnId)
-    {
-        safeStore.get(txnId, this, readScope).removeListener(this);
-    }
-
-    @Override
-    protected void cancel()
-    {
-        node.commandStores().mapReduceConsume(this, waitingOn.stream(), 
forEach(in -> removeListener(in, txnId), node.agent()));
+        return false;
     }
 
     @Override
@@ -272,12 +55,4 @@ public class ReadTxnData extends ReadData implements 
Command.TransientListener,
     {
         return MessageType.READ_REQ;
     }
-
-    @Override
-    public String toString()
-    {
-        return "ReadData{" +
-               "txnId:" + txnId +
-               '}';
-    }
 }
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index 730055c4..63b38810 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -45,6 +45,9 @@ import static accord.messages.ReadData.ReadNack.NotCommitted;
 import static accord.messages.ReadData.ReadNack.Redundant;
 import static accord.utils.MapReduceConsume.forEach;
 
+/**
+ * Wait until the dependencies for this transaction are Applied. Does not wait 
until this transaction is Applied.
+ */
 // TODO (required, efficiency): dedup - can currently have infinite pending 
reads that will be executed independently
 public class WaitUntilApplied extends ReadData implements 
Command.TransientListener, EpochSupplier
 {
@@ -234,7 +237,9 @@ public class WaitUntilApplied extends ReadData implements 
Command.TransientListe
 
     private void removeListener(SafeCommandStore safeStore, TxnId txnId)
     {
-        safeStore.get(txnId, this, readScope).removeListener(this);
+        SafeCommand safeCommand = safeStore.ifInitialised(txnId);
+        if (safeCommand != null)
+            safeCommand.removeListener(this);
     }
 
     @Override
@@ -258,7 +263,7 @@ public class WaitUntilApplied extends ReadData implements 
Command.TransientListe
     @Override
     public String toString()
     {
-        return "WaitForApply{" +
+        return "WaitUntilApplied{" +
                "txnId:" + txnId +
                '}';
     }
diff --git a/accord-core/src/main/java/accord/primitives/Keys.java 
b/accord-core/src/main/java/accord/primitives/Keys.java
index 6c3336db..ee358d66 100644
--- a/accord-core/src/main/java/accord/primitives/Keys.java
+++ b/accord-core/src/main/java/accord/primitives/Keys.java
@@ -18,12 +18,17 @@
 
 package accord.primitives;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
 import java.util.function.Function;
 
 import accord.api.Key;
-import accord.utils.*;
+import accord.utils.ArrayBuffers;
 import accord.utils.ArrayBuffers.ObjectBuffers;
+import accord.utils.SortedArrays;
 
 import static accord.utils.ArrayBuffers.cachedKeys;
 import static accord.utils.SortedArrays.isSortedUnique;
@@ -41,6 +46,8 @@ public class Keys extends AbstractKeys<Key> implements 
Seekables<Key, Keys>
 
     public static final Keys EMPTY = new Keys(new Key[0]);
 
+    private static final Key[] EMPTY_KEYS_ARRAY = new Key[0];
+
     public Keys(SortedSet<? extends Key> keys)
     {
         super(keys.toArray(new Key[0]));
@@ -115,7 +122,9 @@ public class Keys extends AbstractKeys<Key> implements 
Seekables<Key, Keys>
 
     public static Keys of(Collection<? extends Key> keys)
     {
-        return of(keys.toArray(new Key[0]));
+        if (keys.isEmpty())
+            return Keys.EMPTY;
+        return of(keys.toArray(EMPTY_KEYS_ARRAY));
     }
 
     public static <V> Keys of(Collection<V> input, Function<? super V, ? 
extends Key> transform)
diff --git a/accord-core/src/main/java/accord/primitives/RoutingKeys.java 
b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
index 1f1953a6..81212ef7 100644
--- a/accord-core/src/main/java/accord/primitives/RoutingKeys.java
+++ b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
@@ -18,6 +18,8 @@
 
 package accord.primitives;
 
+import java.util.Collection;
+
 import accord.api.RoutingKey;
 import accord.utils.SortedArrays;
 
@@ -38,6 +40,8 @@ public class RoutingKeys extends AbstractUnseekableKeys 
implements Unseekables<R
 
     public static final RoutingKeys EMPTY = new RoutingKeys(new RoutingKey[0]);
 
+    private static final RoutingKey[] EMPTY_KEYS_ARRAY = new RoutingKey[0];
+
     RoutingKeys(RoutingKey[] keys)
     {
         super(keys);
@@ -48,6 +52,13 @@ public class RoutingKeys extends AbstractUnseekableKeys 
implements Unseekables<R
         return new RoutingKeys(toUnique(sort(keys)));
     }
 
+    public static RoutingKeys of(Collection<? extends RoutingKey> keys)
+    {
+        if (keys.isEmpty())
+            return EMPTY;
+        return of(keys.toArray(EMPTY_KEYS_ARRAY));
+    }
+
     public static RoutingKeys ofSortedUnique(RoutingKey ... keys)
     {
         checkArgument(isSortedUnique(keys));
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java 
b/accord-core/src/main/java/accord/primitives/Txn.java
index 93490869..5a792918 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -21,16 +21,18 @@ package accord.primitives;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
-import accord.api.*;
+import accord.api.Data;
+import accord.api.Query;
+import accord.api.Read;
+import accord.api.Result;
+import accord.api.Update;
 import accord.local.SafeCommandStore;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-
 public interface Txn
 {
     enum Kind
@@ -268,8 +270,8 @@ public interface Txn
     default AsyncChain<Data> read(SafeCommandStore safeStore, Timestamp 
executeAt)
     {
         Ranges ranges = safeStore.ranges().safeToReadAt(executeAt);
-        List<AsyncChain<Data>> chains = Routables.foldlMinimal(keys(), ranges, 
(key, accumulate, index) -> {
-            AsyncChain<Data> result = read().read(key, kind(), safeStore, 
executeAt, safeStore.dataStore());
+        List<AsyncChain<Data>> chains = Routables.foldlMinimal(read().keys(), 
ranges, (key, accumulate, index) -> {
+            AsyncChain<Data> result = read().read(key, safeStore, executeAt, 
safeStore.dataStore());
             accumulate.add(result);
             return accumulate;
         }, new ArrayList<>());
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java 
b/accord-core/src/main/java/accord/primitives/Writes.java
index 08fc9442..e470d074 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -18,15 +18,16 @@
 
 package accord.primitives;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
 import accord.api.Write;
 import accord.local.SafeCommandStore;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 
 import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
 
 public class Writes
 {
@@ -50,7 +51,7 @@ public class Writes
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         Writes writes = (Writes) o;
-        return executeAt.equals(writes.executeAt) && keys.equals(writes.keys) 
&& Objects.equals(write, writes.write);
+        return txnId.equals(writes.txnId) && 
executeAt.equals(writes.executeAt) && keys.equals(writes.keys) && 
Objects.equals(write, writes.write);
     }
 
     public boolean isEmpty()
@@ -61,7 +62,7 @@ public class Writes
     @Override
     public int hashCode()
     {
-        return Objects.hash(executeAt, keys, write);
+        return Objects.hash(txnId, executeAt, keys, write);
     }
 
     public AsyncChain<Void> apply(SafeCommandStore safeStore, Ranges ranges, 
PartialTxn txn)
@@ -83,7 +84,8 @@ public class Writes
     public String toString()
     {
         return "TxnWrites{" +
-               "executeAt:" + executeAt +
+               "txnId:" + txnId +
+               ", executeAt:" + executeAt +
                ", keys:" + keys +
                ", write:" + write +
                '}';
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index 27b43ef7..753933dc 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -18,17 +18,34 @@
 
 package accord.topology;
 
-import java.util.*;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import accord.api.RoutingKey;
 import accord.local.Node.Id;
-import accord.primitives.*;
-import accord.utils.*;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routables;
+import accord.primitives.Unseekables;
+import accord.utils.ArrayBuffers;
 import accord.utils.ArrayBuffers.IntBuffers;
-import com.google.common.annotations.VisibleForTesting;
+import accord.utils.IndexedBiFunction;
+import accord.utils.IndexedConsumer;
+import accord.utils.IndexedIntFunction;
+import accord.utils.IndexedTriFunction;
 
 import static accord.utils.SortedArrays.Search.FLOOR;
 import static accord.utils.SortedArrays.exponentialSearch;
@@ -148,6 +165,7 @@ public class Topology
         return supersetIndexes.length < shards.length;
     }
 
+    @VisibleForTesting
     public Topology withEpoch(long epoch)
     {
         return new Topology(epoch, shards, ranges, nodeLookup, subsetOfRanges, 
supersetIndexes);
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java 
b/accord-core/src/main/java/accord/utils/Invariants.java
index b960d8b2..cd5ba6bd 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -18,10 +18,10 @@
 
 package accord.utils;
 
-import net.nicoulaj.compilecommand.annotations.Inline;
-
-import javax.annotation.Nullable;
 import java.util.function.Predicate;
+import javax.annotation.Nullable;
+
+import net.nicoulaj.compilecommand.annotations.Inline;
 
 import static java.lang.String.format;
 
@@ -146,6 +146,13 @@ public class Invariants
         return param;
     }
 
+    public static <T> T nonNull(T param, String message)
+    {
+        if (param == null)
+            throw new NullPointerException(message);
+        return param;
+    }
+
     public static <T> T nonNull(T param, String fmt, Object... args)
     {
         if (param == null)
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index 3fc3ac9d..ac36fb16 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -18,6 +18,7 @@
 
 package accord;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -25,10 +26,10 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
 
-import java.time.Duration;
-
 import accord.api.MessageSink;
 import accord.api.Scheduler;
+import accord.coordinate.TxnExecute;
+import accord.coordinate.TxnPersist;
 import accord.config.LocalConfig;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
@@ -41,6 +42,7 @@ import accord.impl.mock.MockStore;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.messages.Apply;
 import accord.config.MutableLocalConfig;
 import accord.messages.LocalMessage;
 import accord.primitives.Keys;
@@ -54,7 +56,6 @@ import accord.utils.DefaultRandom;
 import accord.utils.EpochFunction;
 import accord.utils.Invariants;
 import accord.utils.ThreadPoolScheduler;
-
 import org.awaitility.Awaitility;
 import org.awaitility.core.ThrowingRunnable;
 
@@ -112,7 +113,6 @@ public class Utils
     {
         return new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY, 
MockStore.update(keys));
     }
-
     public static Txn writeTxn(Ranges ranges)
     {
         return new Txn.InMemory(ranges, MockStore.read(ranges), 
MockStore.QUERY, MockStore.update(ranges));
@@ -161,7 +161,7 @@ public class Utils
                              scheduler,
                              SizeOfIntersectionSorter.SUPPLIER,
                              SimpleProgressLog::new,
-                             InMemoryCommandStores.Synchronized::new,
+                             InMemoryCommandStores.Synchronized::new, 
TxnExecute.FACTORY, TxnPersist.FACTORY, Apply.FACTORY,
                              localConfig);
         awaitUninterruptibly(node.unsafeStart());
         return node;
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index b147efb7..721cf030 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -21,8 +21,8 @@ package accord.burn;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.EnumMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
@@ -53,13 +53,14 @@ import org.slf4j.LoggerFactory;
 
 import accord.api.Key;
 import accord.impl.IntHashKey;
+import accord.impl.TopologyFactory;
 import accord.impl.basic.Cluster;
+import accord.impl.basic.Cluster.Stats;
+import accord.impl.basic.Packet;
 import accord.impl.basic.PendingRunnable;
 import accord.impl.basic.PropagatingPendingQueue;
 import accord.impl.basic.RandomDelayQueue;
 import accord.impl.basic.RandomDelayQueue.Factory;
-import accord.impl.TopologyFactory;
-import accord.impl.basic.Packet;
 import accord.impl.basic.SimulatedDelayedExecutorService;
 import accord.impl.list.ListAgent;
 import accord.impl.list.ListQuery;
@@ -307,7 +308,7 @@ public class BurnTest
             }
         };
 
-        EnumMap<MessageType, Cluster.Stats> messageStatsMap;
+        Map<MessageType, Stats> messageStatsMap;
         try
         {
             messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), listener, 
() -> queue, queue::checkFailures,
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index efeadfec..8595f7a6 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -21,7 +21,6 @@ package accord.impl.basic;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -45,6 +44,8 @@ import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.burn.BurnTestConfigurationService;
 import accord.burn.TopologyUpdates;
+import accord.coordinate.TxnExecute;
+import accord.coordinate.TxnPersist;
 import accord.impl.CoordinateDurabilityScheduling;
 import accord.impl.IntHashKey;
 import accord.impl.SimpleProgressLog;
@@ -56,6 +57,7 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.messages.Apply;
 import accord.config.MutableLocalConfig;
 import accord.messages.LocalMessage;
 import accord.messages.MessageType;
@@ -84,7 +86,7 @@ public class Cluster implements Scheduler
         public String toString() { return Integer.toString(count); }
     }
 
-    EnumMap<MessageType, Stats> statsMap = new EnumMap<>(MessageType.class);
+    Map<MessageType, Stats> statsMap = new HashMap<>();
 
     final RandomSource randomSource;
     final Function<Id, Node> lookup;
@@ -216,7 +218,7 @@ public class Cluster implements Scheduler
         run.run();
     }
 
-    public static EnumMap<MessageType, Stats> run(Id[] nodes, MessageListener 
messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, 
Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> 
randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory 
topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
+    public static Map<MessageType, Stats> run(Id[] nodes, MessageListener 
messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures, 
Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> 
randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory 
topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
     {
         Topology topology = topologyFactory.toTopology(nodes);
         Map<Id, Node> lookup = new LinkedHashMap<>();
@@ -236,7 +238,8 @@ public class Cluster implements Scheduler
                                      () -> new ListStore(id), new 
ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
                                      executor.agent(),
                                      randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER,
-                                     SimpleProgressLog::new, 
DelayedCommandStores.factory(sinks.pending), localConfig);
+                                     SimpleProgressLog::new, 
DelayedCommandStores.factory(sinks.pending), TxnExecute.FACTORY, 
TxnPersist.FACTORY, Apply.FACTORY,
+                                     localConfig);
                 lookup.put(id, node);
                 CoordinateDurabilityScheduling durability = new 
CoordinateDurabilityScheduling(node);
                 // TODO (desired): randomise
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java 
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index 3a287bf4..c26bda01 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -21,12 +21,6 @@ package accord.impl.list;
 import java.util.Map;
 import java.util.function.Function;
 
-import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.utils.async.AsyncChain;
-import accord.utils.Timestamped;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,9 +29,14 @@ import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.Read;
 import accord.local.CommandStore;
+import accord.local.SafeCommandStore;
 import accord.primitives.Range;
+import accord.primitives.Ranges;
 import accord.primitives.Seekable;
 import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.utils.Timestamped;
+import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncExecutor;
 
 public class ListRead implements Read
@@ -62,7 +61,7 @@ public class ListRead implements Read
     }
 
     @Override
-    public AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore 
commandStore, Timestamp executeAt, DataStore store)
+    public AsyncChain<Data> read(Seekable key, SafeCommandStore commandStore, 
Timestamp executeAt, DataStore store)
     {
         ListStore s = (ListStore)store;
         return executor.apply(commandStore.commandStore()).submit(() -> {
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 4b3d198d..3678da1b 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
 
 import accord.NetworkFilter;
 import accord.api.MessageSink;
+import accord.coordinate.TxnExecute;
+import accord.coordinate.TxnPersist;
 import accord.config.LocalConfig;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
@@ -46,6 +48,7 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.messages.Apply;
 import accord.config.MutableLocalConfig;
 import accord.messages.Callback;
 import accord.messages.LocalMessage;
@@ -136,6 +139,9 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
                              SizeOfIntersectionSorter.SUPPLIER,
                              SimpleProgressLog::new,
                              InMemoryCommandStores.SingleThread::new,
+                             TxnExecute.FACTORY,
+                             TxnPersist.FACTORY,
+                             Apply.FACTORY,
                              localConfig);
         awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(topology, true);
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java 
b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index bae14b57..52385666 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -19,15 +19,20 @@
 package accord.impl.mock;
 
 import accord.api.Data;
+import accord.api.DataStore;
 import accord.api.Query;
 import accord.api.Read;
 import accord.api.Result;
-import accord.api.DataStore;
 import accord.api.Update;
 import accord.api.Write;
 import accord.local.Node;
 import accord.local.SafeCommandStore;
-import accord.primitives.*;
+import accord.primitives.Ranges;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.Writes;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResults;
@@ -57,7 +62,7 @@ public class MockStore implements DataStore
             }
 
             @Override
-            public AsyncChain<Data> read(Seekable key, Txn.Kind kind, 
SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+            public AsyncChain<Data> read(Seekable key, SafeCommandStore 
commandStore, Timestamp executeAt, DataStore store)
             {
                 return AsyncChains.success(DATA);
             }
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index ef573d11..8ba38dd3 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -32,6 +32,8 @@ import accord.api.ProgressLog;
 import accord.api.RoutingKey;
 import accord.api.Scheduler;
 import accord.api.TestableConfigurationService;
+import accord.coordinate.TxnExecute;
+import accord.coordinate.TxnPersist;
 import accord.config.LocalConfig;
 import accord.impl.InMemoryCommandStore;
 import accord.impl.InMemoryCommandStores;
@@ -44,6 +46,7 @@ import accord.impl.mock.MockConfigurationService;
 import accord.impl.mock.MockStore;
 import accord.local.Node.Id;
 import accord.local.SaveStatus.LocalExecution;
+import accord.messages.Apply;
 import accord.config.MutableLocalConfig;
 import accord.primitives.FullKeyRoute;
 import accord.primitives.Keys;
@@ -113,9 +116,11 @@ public class ImmutableCommandTest
         MockCluster.Clock clock = new MockCluster.Clock(100);
         LocalConfig localConfig = new MutableLocalConfig();
         Node node = new Node(id, null, null, new 
MockConfigurationService(null, (epoch, service) -> { }, 
storeSupport.local.get()),
-                        clock, 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
-                        () -> storeSupport.data, new 
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new 
TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
-                        SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new, localConfig);
+                             clock, 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
+                             () -> storeSupport.data, new 
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new 
TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
+                             SizeOfIntersectionSorter.SUPPLIER, ignore -> 
ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new,
+                             TxnExecute.FACTORY, TxnPersist.FACTORY, 
Apply.FACTORY,
+                             localConfig);
         awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(storeSupport.local.get(), true);
         return node;
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java 
b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index fa84bea1..1afe638d 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -18,26 +18,47 @@
 
 package accord.messages;
 
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
 import accord.api.RoutingKey;
-import accord.impl.*;
+import accord.impl.AbstractSafeCommandStore;
+import accord.impl.CommandTimeseries;
 import accord.impl.CommandTimeseries.CommandLoader;
+import accord.impl.CommandsForKey;
+import accord.impl.IntKey;
 import accord.impl.IntKey.Raw;
-import accord.impl.mock.*;
-import accord.local.Node.Id;
+import accord.impl.TopologyFactory;
 import accord.impl.mock.MockCluster.Clock;
-import accord.primitives.*;
+import accord.impl.mock.Network;
+import accord.impl.mock.RecordingMessageSink;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.local.PreLoadContext;
+import accord.local.Status;
+import accord.primitives.Ballot;
+import accord.primitives.FullRoute;
+import accord.primitives.KeyDeps;
+import accord.primitives.Keys;
+import accord.primitives.PartialDeps;
+import accord.primitives.Participants;
+import accord.primitives.RangeDeps;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
 import accord.topology.Topology;
-import accord.local.*;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.function.BiFunction;
-import java.util.stream.Stream;
 
-import static accord.Utils.*;
+import static accord.Utils.createNode;
+import static accord.Utils.id;
+import static accord.Utils.writeTxn;
 import static accord.impl.InMemoryCommandStore.inMemory;
 import static accord.impl.IntKey.range;
 import static accord.impl.IntKey.routing;
diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java 
b/accord-core/src/test/java/accord/messages/ReadDataTest.java
index 27c3e9bf..97b44a87 100644
--- a/accord-core/src/test/java/accord/messages/ReadDataTest.java
+++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java
@@ -58,6 +58,7 @@ import accord.primitives.PartialTxn;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.Routable;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -101,7 +102,8 @@ class ReadDataTest
         Read read = Mockito.mock(Read.class);
         Mockito.when(read.slice(any())).thenReturn(read);
         Mockito.when(read.merge(any())).thenReturn(read);
-        Mockito.when(read.read(any(), any(), any(), any(), 
any())).thenAnswer(new Answer<AsyncChain<Data>>()
+        Mockito.when(read.keys()).thenReturn((Seekables)keys);
+        Mockito.when(read.read(any(), any(), any(), any())).thenAnswer(new 
Answer<AsyncChain<Data>>()
         {
             private final boolean called = false;
             @Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 0184765d..f69e4ed4 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -43,6 +43,8 @@ import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.config.LocalConfig;
 import accord.config.MutableLocalConfig;
+import accord.coordinate.TxnExecute;
+import accord.coordinate.TxnPersist;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
@@ -51,6 +53,7 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.messages.Apply;
 import accord.messages.Callback;
 import accord.messages.LocalMessage;
 import accord.messages.Reply;
@@ -322,7 +325,8 @@ public class Cluster implements Scheduler
                                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                                           MaelstromAgent.INSTANCE,
                                           randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER,
-                                          SimpleProgressLog::new, 
InMemoryCommandStores.SingleThread::new, localConfig));
+                                          SimpleProgressLog::new, 
InMemoryCommandStores.SingleThread::new, TxnExecute.FACTORY, 
TxnPersist.FACTORY, Apply.FACTORY,
+                                          localConfig));
             }
 
             AsyncResult<?> startup = 
AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()),
 (a, b) -> null).beginAsResult();
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
index eeb9ca1a..9bb16f11 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
@@ -18,9 +18,15 @@
 
 package accord.maelstrom;
 
-import accord.api.*;
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.Read;
 import accord.local.SafeCommandStore;
-import accord.primitives.*;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekable;
+import accord.primitives.Timestamp;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 
@@ -42,7 +48,7 @@ public class MaelstromRead implements Read
     }
 
     @Override
-    public AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore 
commandStore, Timestamp executeAt, DataStore store)
+    public AsyncChain<Data> read(Seekable key, SafeCommandStore commandStore, 
Timestamp executeAt, DataStore store)
     {
         MaelstromStore s = (MaelstromStore)store;
         MaelstromData result = new MaelstromData();
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 40e73ff2..5d2a459c 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -35,6 +35,8 @@ import accord.api.Scheduler;
 import accord.config.LocalConfig;
 import accord.config.MutableLocalConfig;
 import accord.coordinate.Timeout;
+import accord.coordinate.TxnExecute;
+import accord.coordinate.TxnPersist;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
@@ -44,6 +46,7 @@ import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.maelstrom.Packet.Type;
+import accord.messages.Apply;
 import accord.messages.Callback;
 import accord.messages.LocalMessage;
 import accord.messages.Reply;
@@ -180,7 +183,8 @@ public class Main
                           System::currentTimeMillis, 
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                           MaelstromAgent.INSTANCE, new DefaultRandom(), 
scheduler, SizeOfIntersectionSorter.SUPPLIER,
-                          SimpleProgressLog::new, 
InMemoryCommandStores.SingleThread::new, localConfig);
+                          SimpleProgressLog::new, 
InMemoryCommandStores.SingleThread::new, TxnExecute.FACTORY, 
TxnPersist.FACTORY, Apply.FACTORY,
+                          localConfig);
             awaitUninterruptibly(on.unsafeStart());
             err.println("Initialized node " + init.self);
             err.flush();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to