dcapwell commented on code in PR #10:
URL: https://github.com/apache/cassandra-accord/pull/10#discussion_r969941939


##########
accord-core/src/main/java/accord/coordinate/Execute.java:
##########
@@ -70,6 +71,12 @@ private void start()
     {
         Set<Id> readSet = readTracker.computeMinimalReadSetAndMarkInflight();
         Commit.commitAndRead(node, topologies, txnId, txn, homeKey, executeAt, 
deps, readSet, this);
+        // skip straight to persistence if there's no read
+        if (txn.read().keys().isEmpty())

Review Comment:
   We are not good with nullability in Accord, so its not clear if read is 
allowed to be null... Based off Caleb's branch 
org.apache.cassandra.cql3.statements.TransactionStatement#createRead won't be 
null in a write only txn, but is null allowed or not? If so then we need a null 
check at "if (txn.read().keys().isEmpty())"



##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -356,42 +315,110 @@ public synchronized void shutdown()
                 commandStore.shutdown();
     }
 
-    protected abstract <S> void forEach(Select<S> select, S scope, long 
minEpoch, long maxEpoch, Consumer<? super CommandStore> forEach);
-    protected abstract <S, T> T mapReduce(Select<S> select, S scope, long 
minEpoch, long maxEpoch, Function<? super CommandStore, T> map, BiFunction<T, 
T, T> reduce);
+    private static <T> Fold<TxnOperation, Void, List<Future<T>>> 
mapReduceFold(Function<CommandStore, T> map)
+    {
+        return (store, op, i, t) -> { t.add(store.process(op, map)); return t; 
};
+    }
+
+    private static <T> T reduce(List<Future<T>> futures, BiFunction<T, T, T> 
reduce)
+    {
+        T result = null;
+        for (Future<T> future : futures)
+        {
+            try
+            {
+                T next = future.get();
+                result = result == null ? next : reduce.apply(result, next);
+            }
+            catch (InterruptedException e)
+            {
+                throw new UncheckedInterruptedException(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e.getCause());
+            }
+        }
+        return result;
+    }
+
+    private <F, T> T setup(F f, Fold<F, Void, List<Future<T>>> fold, 
BiFunction<T, T, T> reduce)
+    {
+        List<Future<T>> futures = foldl((s, i, mn, mx) -> s.all(), null, 
Long.MIN_VALUE, Long.MAX_VALUE, fold, f, null, ArrayList::new);
+        return reduce(futures, reduce);
+    }
+
+    private  <S, T> T mapReduce(TxnOperation operation, Select<S> select, S 
scope, long minEpoch, long maxEpoch, Fold<TxnOperation, Void, List<Future<T>>> 
fold, BiFunction<T, T, T> reduce)
+    {
+        List<Future<T>> futures = foldl(select, scope, minEpoch, maxEpoch, 
fold, operation, null, ArrayList::new);
+        if (futures == null)
+            return null;
+        return reduce(futures, reduce);
+    }
+
+    public <T> T mapReduce(TxnOperation operation, Key key, long minEpoch, 
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, ShardedRanges::shard, key, minEpoch, 
maxEpoch, mapReduceFold(map), reduce);
+    }
+
+    public <T> T mapReduce(TxnOperation operation, Key key, long epoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, key, epoch, epoch, map, reduce);
+    }
+
+    public <T> T mapReduceSince(TxnOperation operation, Key key, long epoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, key, epoch, Long.MAX_VALUE, map, reduce);
+    }
 
-    public void forEach(Consumer<CommandStore> forEach)
+    public <T> T mapReduce(TxnOperation operation, Keys keys, long minEpoch, 
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
     {
-        forEach((s, i, min, max) -> s.all(), null, 0, 0, forEach);
+        // probably need to split txnOperation and scope stuff here
+        return mapReduce(operation, ShardedRanges::shards, keys, minEpoch, 
maxEpoch, mapReduceFold(map), reduce);
     }
 
-    public void forEach(Keys keys, long epoch, Consumer<CommandStore> forEach)
+    public void setup(Consumer<CommandStore> forEach)
     {
-        forEach(keys, epoch, epoch, forEach);
+        setup(forEach, (store, f, i, t) -> { t.add(store.processSetup(f)); 
return t; }, (Void i1, Void i2) -> null);
     }
 
-    public void forEach(Keys keys, long minEpoch, long maxEpoch, 
Consumer<CommandStore> forEach)
+    public <T> T setup(Function<CommandStore, T> map, BiFunction<T, T, T> 
reduce)
     {
-        forEach(ShardedRanges::shards, keys, minEpoch, maxEpoch, forEach);
+        return setup(map, (store, f, i, t) -> { t.add(store.processSetup(f)); 
return t; }, reduce);
     }
 
-    public <T> T mapReduce(Keys keys, long epoch, Function<CommandStore, T> 
map, BiFunction<T, T, T> reduce)
+    private static Fold<TxnOperation, Void, List<Future<Void>>> 
forEachFold(Consumer<CommandStore> forEach)
     {
-        return mapReduce(keys, epoch, epoch, map, reduce);
+        return (store, op, i, t) -> { t.add(store.process(op, forEach)); 
return t; };
     }
 
-    public <T> T mapReduce(Keys keys, long minEpoch, long maxEpoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    public void forEach(TxnOperation operation, Keys keys, long minEpoch, long 
maxEpoch, Consumer<CommandStore> forEach)

Review Comment:
   "forEach" takes both a "TxnOperation" and "Keys", but "TxnOperation" holds 
ref to "Keys", so shouldn't we drop "Keys"? Looking at the usage, it seems like 
we could...
   
   ```
   node.forEachLocalSince(TxnOperation.scopeFor(txnId), full.txn.keys(), ...
   ```
   
   ```
   TxnOperation scope = TxnOperation.scopeFor(txnId, max.txn.keys());
   ...
   node.forEachLocalSince(scope, max.txn.keys(), ...
   ```
   
   This change trickled throughout a lot of different places, so would be good 
to grasp the expected semantics... if the TxnOperation has keys, and you 
provide keys... "should" they be released or "must" they be related? or 
something else?



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)

Review Comment:
   I still need to look closer at our epoch logic but if we have multiple 
epochs that are not full acknowledged then wouldn't forEpochCommands have the 
wrong min epoch? This is only used in tests and not in the C* patch... if this 
is only for tests then I am cool with this, but it would be good to start 
annotating what methods are not meant to be used in `src/main`
   
   Same comment for forCommittedInEpoch



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+
+                Collection<Command> committed = commands.committedByExecuteAt()
+                        .between(minTimestamp, 
maxTimestamp).collect(Collectors.toList());
+                committed.forEach(consumer);
+            }
+        }
+    }
+
+    protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)
+    {
+        processInternal(cs -> {
+            consumer.accept(cs);
+            return null;
+        }, promise);
+    }
+
+    protected <T> void processInternal(Function<? super CommandStore, T> 
function, Promise<T> promise)
+    {
+        try
+        {
+            T result = function.apply(this);
+            promise.setSuccess(result);
+        }
+        catch (Throwable e)
+        {
+            promise.tryFailure(e);
+        }
+    }
+
+    public static class Synchronized extends InMemoryCommandStore
+    {
+        public Synchronized(int generation,
+                            int index,
+                            int numShards,
+                            Function<Timestamp, Timestamp> uniqueNow,
+                            LongSupplier currentEpoch,
+                            Agent agent,
+                            DataStore store,
+                            ProgressLog.Factory progressLogFactory,
+                            RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, uniqueNow, currentEpoch, 
agent, store, progressLogFactory, rangesForEpoch);
+        }
+
+        @Override
+        public synchronized Future<Void> processSetup(Consumer<? super 
CommandStore> function)
+        {
+            AsyncPromise<Void> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized <T> Future<T> processSetup(Function<? super 
CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized Future<Void> process(TxnOperation unused, 
Consumer<? super CommandStore> consumer)
+        {
+            Promise<Void> promise = new AsyncPromise<>();
+            processInternal(consumer, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized <T> Future<T> process(TxnOperation unused, 
Function<? super CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized void shutdown() {}
+    }
+
+    public static class SingleThread extends InMemoryCommandStore
+    {
+        private final ExecutorService executor;
+
+        private class ConsumerWrapper extends AsyncPromise<Void> implements 
Runnable
+        {
+            private final Consumer<? super CommandStore> consumer;
+
+            public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+            {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(consumer, this);
+            }
+        }
+
+        private class FunctionWrapper<T> extends AsyncPromise<T> implements 
Runnable
+        {
+            private final Function<? super CommandStore, T> function;
+
+            public FunctionWrapper(Function<? super CommandStore, T> function)
+            {
+                this.function = function;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(function, this);
+            }
+        }
+
+        public SingleThread(int generation,
+                            int index,
+                            int numShards,
+                            Node.Id nodeId,
+                            Function<Timestamp, Timestamp> uniqueNow,
+                            LongSupplier currentEpoch,
+                            Agent agent,
+                            DataStore store,
+                            ProgressLog.Factory progressLogFactory,
+                            RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, uniqueNow, currentEpoch, 
agent, store, progressLogFactory, rangesForEpoch);
+            executor = Executors.newSingleThreadExecutor(r -> {

Review Comment:
   SingleThread should expose ability to control thread name... in C* we have a 
"global prefix" for jvm-dtest 



##########
accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandsForKey;
+import accord.primitives.Timestamp;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Stream;
+
+public class InMemoryCommandsForKey extends CommandsForKey
+{
+    static class InMemoryCommandTimeseries implements CommandTimeseries
+    {
+        private final NavigableMap<Timestamp, Command> commands = new 
TreeMap<>();
+
+        @Override
+        public Command get(Timestamp timestamp)
+        {
+            return commands.get(timestamp);
+        }
+
+        @Override
+        public void add(Timestamp timestamp, Command command)

Review Comment:
   should check to see if a conflict was detected



##########
accord-core/build.gradle:
##########
@@ -29,11 +29,13 @@ java {
   withSourcesJar()
 }
 
+compileJava {

Review Comment:
   can you move the 1.8 logic to 
buildSrc/src/main/groovy/accord.java-conventions.gradle



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)

Review Comment:
   Reply from Blake in JIRA
   
   I'd tried that, but you also need to pass in the min and max timestamps, 
which makes 3 arguments so you need a special interface, and the duplicated 
code isn't too complex, so I figured it was a net negative for readability in 
exchange for a few less lines of code



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+
+                Collection<Command> committed = commands.committedByExecuteAt()
+                        .between(minTimestamp, 
maxTimestamp).collect(Collectors.toList());
+                committed.forEach(consumer);
+            }
+        }
+    }
+
+    protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)
+    {
+        processInternal(cs -> {
+            consumer.accept(cs);
+            return null;
+        }, promise);
+    }
+
+    protected <T> void processInternal(Function<? super CommandStore, T> 
function, Promise<T> promise)
+    {
+        try
+        {
+            T result = function.apply(this);
+            promise.setSuccess(result);
+        }
+        catch (Throwable e)
+        {
+            promise.tryFailure(e);
+        }
+    }
+
+    public static class Synchronized extends InMemoryCommandStore
+    {
+        public Synchronized(int generation,
+                            int index,
+                            int numShards,
+                            Function<Timestamp, Timestamp> uniqueNow,
+                            LongSupplier currentEpoch,
+                            Agent agent,
+                            DataStore store,
+                            ProgressLog.Factory progressLogFactory,
+                            RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, uniqueNow, currentEpoch, 
agent, store, progressLogFactory, rangesForEpoch);
+        }
+
+        @Override
+        public synchronized Future<Void> processSetup(Consumer<? super 
CommandStore> function)
+        {
+            AsyncPromise<Void> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized <T> Future<T> processSetup(Function<? super 
CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized Future<Void> process(TxnOperation unused, 
Consumer<? super CommandStore> consumer)
+        {
+            Promise<Void> promise = new AsyncPromise<>();
+            processInternal(consumer, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized <T> Future<T> process(TxnOperation unused, 
Function<? super CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized void shutdown() {}
+    }
+
+    public static class SingleThread extends InMemoryCommandStore
+    {
+        private final ExecutorService executor;
+
+        private class ConsumerWrapper extends AsyncPromise<Void> implements 
Runnable
+        {
+            private final Consumer<? super CommandStore> consumer;
+
+            public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+            {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(consumer, this);
+            }
+        }
+
+        private class FunctionWrapper<T> extends AsyncPromise<T> implements 
Runnable
+        {
+            private final Function<? super CommandStore, T> function;
+
+            public FunctionWrapper(Function<? super CommandStore, T> function)
+            {
+                this.function = function;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(function, this);
+            }
+        }
+
+        public SingleThread(int generation,
+                            int index,
+                            int numShards,
+                            Node.Id nodeId,
+                            Function<Timestamp, Timestamp> uniqueNow,
+                            LongSupplier currentEpoch,
+                            Agent agent,
+                            DataStore store,
+                            ProgressLog.Factory progressLogFactory,
+                            RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, uniqueNow, currentEpoch, 
agent, store, progressLogFactory, rangesForEpoch);
+            executor = Executors.newSingleThreadExecutor(r -> {

Review Comment:
   Reply from Blake in JIRA
   
   This is just for testing, I don't think we need anything more sophisticated 
here unless it's serving an immediate purpose.



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStores.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.ProgressLog;
+import accord.local.CommandStore;
+import accord.local.CommandStores;
+import accord.local.Node;
+import accord.primitives.Keys;
+
+import java.util.function.Consumer;
+
+import static java.lang.Boolean.FALSE;
+
+public abstract class InMemoryCommandStores extends CommandStores
+{
+    public InMemoryCommandStores(int num, Node node, Agent agent, DataStore 
store,
+                                 ProgressLog.Factory progressLogFactory)
+    {
+        super(num, node, agent, store, progressLogFactory);
+    }
+
+    public static InMemoryCommandStores inMemory(Node node)
+    {
+        return (InMemoryCommandStores) node.commandStores();
+    }
+
+    public void forEachLocal(Consumer<? super CommandStore> forEach)

Review Comment:
   Reply from Blake in JIRA
   
   these are only there for testing and assume in-memory implementation details 
(since none of them require a TxnScope)



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)

Review Comment:
   Nit: forCommittedInEpoch and forEpochCommands are mostly copy/paste of each 
other, can we create a new util function such as
   
   ```
   public void forEach(KeyRanges ranges, long epoch, Consumer<CommandsForKey> 
consumer)
       {
           Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
           Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
           for (KeyRange range : ranges)
           {
               Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
                                                                              
range.startInclusive(),
                                                                              
range.end(),
                                                                              
range.endInclusive()).values();
               rangeCommands.forEach(consumer);
           }
       }
   ```



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -274,99 +289,110 @@ public boolean commit(Txn txn, Key homeKey, Key 
progressKey, Timestamp executeAt
             });
         }
 
-        if (waitingOnCommit.isEmpty())
-        {
-            waitingOnCommit = null;
-            if (waitingOnApply.isEmpty())
-                waitingOnApply = null;
-        }
 
-        boolean isProgressShard = progressKey != null && handles(txnId.epoch, 
progressKey);
-        commandStore.progressLog().commit(txnId, isProgressShard, 
isProgressShard && progressKey.equals(homeKey));
+        boolean isProgressShard = progressKey != null && 
handles(txnId().epoch, progressKey);
+        commandStore().progressLog().commit(txnId(), isProgressShard, 
isProgressShard && progressKey.equals(homeKey));
 
-        maybeExecute(false);
-        listeners.forEach(this);
         return true;
     }
 
+    public Future<Void> commitAndBeginExecution(Txn txn, Key homeKey, Key 
progressKey, Timestamp executeAt, Deps deps)
+    {
+        if (!commit(txn, homeKey, progressKey, executeAt, deps))
+            return Write.SUCCESS;
+
+        return maybeExecute(true);
+    }
+
+    // TODO (now): commitInvalidate may need to update cfks _if_ possible
     public boolean commitInvalidate()
     {
         if (hasBeen(Committed))
         {
+            logger.trace("{}: skipping commit invalidated - already committed 
({})", txnId(), status());
             if (!hasBeen(Invalidated))
-                commandStore.agent().onInconsistentTimestamp(this, 
Timestamp.NONE, executeAt);
+                commandStore().agent().onInconsistentTimestamp(this, 
Timestamp.NONE, executeAt());
 
             return false;
         }
 
-        status = Invalidated;
+        status(Invalidated);
 
-        boolean isProgressShard = progressKey != null && handles(txnId.epoch, 
progressKey);
-        commandStore.progressLog().invalidate(txnId, isProgressShard, 
isProgressShard && progressKey.equals(homeKey));
+        boolean isProgressShard = progressKey() != null && 
handles(txnId().epoch, progressKey());
+        commandStore().progressLog().invalidate(txnId(), isProgressShard, 
isProgressShard && progressKey().equals(homeKey()));
+        logger.trace("{}: committed invalidated", txnId());
 
-        listeners.forEach(this);
+        notifyListeners();
         return true;
     }
 
-    public boolean apply(Txn txn, Key homeKey, Key progressKey, Timestamp 
executeAt, Deps deps, Writes writes, Result result)
+    public Future<Void> apply(Txn txn, Key homeKey, Key progressKey, Timestamp 
executeAt, Deps deps, Writes writes, Result result)

Review Comment:
   Reply from Blake in JIRA
   
   So I don't think your example timeline is possible, since Command methods 
like commit are idempotent and the command store is single threaded. T2 would 
be a noop. However, I don't think there's anything to prevent maybeExecute to 
fire off duplicate writes in the time between Command#apply dispatches writes 
and the time Command#postApply sets the status to Applied in 
Command#postApply., and the same goes for Command#read. I'm going to think 
about this a bit, since it depends on the concurrency of the implementation I'm 
not sure if it would be better to handle this in the library, or in the 
implementation....



##########
accord-core/src/main/java/accord/impl/InMemoryCommand.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.api.Result;
+import accord.local.*;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.txn.*;
+
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import static accord.local.Status.NotWitnessed;
+
+public class InMemoryCommand extends Command
+{
+    public final CommandStore commandStore;
+    private final TxnId txnId;
+
+    private Key homeKey, progressKey;
+    private Txn txn;
+    private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
+    private Timestamp executeAt;
+    private Deps deps = Deps.NONE;
+    private Writes writes;
+    private Result result;
+
+    private Status status = NotWitnessed;
+
+    private boolean isGloballyPersistent; // only set on home shard
+
+    private NavigableMap<TxnId, Command> waitingOnCommit;
+    private NavigableMap<TxnId, Command> waitingOnApply;
+
+    private final Listeners listeners = new Listeners();
+
+    public InMemoryCommand(CommandStore commandStore, TxnId txnId)
+    {
+        this.commandStore = commandStore;
+        this.txnId = txnId;
+    }
+
+    @Override
+    public boolean equals(Object o)

Review Comment:
   eq is missing homeKey, progressKey, isGloballyPersistent, is this 
intentional?



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+
+                Collection<Command> committed = commands.committedByExecuteAt()
+                        .between(minTimestamp, 
maxTimestamp).collect(Collectors.toList());
+                committed.forEach(consumer);
+            }
+        }
+    }
+
+    protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)

Review Comment:
   nit: "protected void processInternal(Consumer<? super CommandStore> 
consumer, Promise<Void> promise)" could be the following to avoid copy/paste
   
   ```
   protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)
       {
           processInternal(cs -> {
               consumer.accept(cs);
               return null;
           }, promise);
       }
   ```



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+
+                Collection<Command> committed = commands.committedByExecuteAt()
+                        .between(minTimestamp, 
maxTimestamp).collect(Collectors.toList());
+                committed.forEach(consumer);
+            }
+        }
+    }
+
+    protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)
+    {
+        processInternal(cs -> {
+            consumer.accept(cs);
+            return null;
+        }, promise);
+    }
+
+    protected <T> void processInternal(Function<? super CommandStore, T> 
function, Promise<T> promise)
+    {
+        try
+        {
+            T result = function.apply(this);
+            promise.setSuccess(result);
+        }
+        catch (Throwable e)
+        {
+            promise.tryFailure(e);
+        }
+    }
+
+    public static class Synchronized extends InMemoryCommandStore

Review Comment:
   Synchronized doesn't use "synchronized" in every method... so name feels 
off...



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+
+                Collection<Command> committed = commands.committedByExecuteAt()
+                        .between(minTimestamp, 
maxTimestamp).collect(Collectors.toList());
+                committed.forEach(consumer);
+            }
+        }
+    }
+
+    protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)
+    {
+        processInternal(cs -> {
+            consumer.accept(cs);
+            return null;
+        }, promise);
+    }
+
+    protected <T> void processInternal(Function<? super CommandStore, T> 
function, Promise<T> promise)
+    {
+        try
+        {
+            T result = function.apply(this);
+            promise.setSuccess(result);
+        }
+        catch (Throwable e)
+        {
+            promise.tryFailure(e);
+        }
+    }
+
+    public static class Synchronized extends InMemoryCommandStore
+    {
+        public Synchronized(int generation,
+                            int index,
+                            int numShards,
+                            Function<Timestamp, Timestamp> uniqueNow,
+                            LongSupplier currentEpoch,
+                            Agent agent,
+                            DataStore store,
+                            ProgressLog.Factory progressLogFactory,
+                            RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, uniqueNow, currentEpoch, 
agent, store, progressLogFactory, rangesForEpoch);
+        }
+
+        @Override
+        public synchronized Future<Void> processSetup(Consumer<? super 
CommandStore> function)
+        {
+            AsyncPromise<Void> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized <T> Future<T> processSetup(Function<? super 
CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized Future<Void> process(TxnOperation unused, 
Consumer<? super CommandStore> consumer)
+        {
+            Promise<Void> promise = new AsyncPromise<>();
+            processInternal(consumer, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized <T> Future<T> process(TxnOperation unused, 
Function<? super CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized void shutdown() {}
+    }
+
+    public static class SingleThread extends InMemoryCommandStore
+    {
+        private final ExecutorService executor;
+
+        private class ConsumerWrapper extends AsyncPromise<Void> implements 
Runnable
+        {
+            private final Consumer<? super CommandStore> consumer;
+
+            public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+            {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(consumer, this);
+            }
+        }
+
+        private class FunctionWrapper<T> extends AsyncPromise<T> implements 
Runnable
+        {
+            private final Function<? super CommandStore, T> function;
+
+            public FunctionWrapper(Function<? super CommandStore, T> function)
+            {
+                this.function = function;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(function, this);
+            }
+        }
+
+        public SingleThread(int generation,
+                            int index,
+                            int numShards,
+                            Node.Id nodeId,
+                            Function<Timestamp, Timestamp> uniqueNow,
+                            LongSupplier currentEpoch,
+                            Agent agent,
+                            DataStore store,
+                            ProgressLog.Factory progressLogFactory,
+                            RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, uniqueNow, currentEpoch, 
agent, store, progressLogFactory, rangesForEpoch);
+            executor = Executors.newSingleThreadExecutor(r -> {
+                Thread thread = new Thread(r);
+                thread.setName(CommandStore.class.getSimpleName() + '[' + 
nodeId + ':' + index + ']');
+                return thread;
+            });
+        }
+
+        @Override
+        public Future<Void> processSetup(Consumer<? super CommandStore> 
function)
+        {
+            ConsumerWrapper future = new ConsumerWrapper(function);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public <T> Future<T> processSetup(Function<? super CommandStore, T> 
function)
+        {
+            FunctionWrapper<T> future = new FunctionWrapper<>(function);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public Future<Void> process(TxnOperation unused, Consumer<? super 
CommandStore> consumer)
+        {
+            ConsumerWrapper future = new ConsumerWrapper(consumer);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public <T> Future<T> process(TxnOperation unused, Function<? super 
CommandStore, T> function)
+        {
+            FunctionWrapper<T> future = new FunctionWrapper<>(function);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public void shutdown()
+        {
+            executor.shutdown();
+        }
+    }
+
+    public static class SingleThreadDebug extends SingleThread
+    {
+        private final AtomicReference<Thread> expectedThread = new 
AtomicReference<>();
+
+        public SingleThreadDebug(int generation,
+                                 int index,
+                                 int numShards,
+                                 Node.Id nodeId,
+                                 Function<Timestamp, Timestamp> uniqueNow,
+                                 LongSupplier currentEpoch,
+                                 Agent agent,
+                                 DataStore store,
+                                 ProgressLog.Factory progressLogFactory,
+                                 RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, nodeId, uniqueNow, 
currentEpoch, agent, store, progressLogFactory, rangesForEpoch);
+        }
+
+        private void assertThread()
+        {
+            Thread current = Thread.currentThread();
+            Thread expected;
+            while (true)
+            {
+                expected = expectedThread.get();
+                if (expected != null)
+                    break;
+                expectedThread.compareAndSet(null, Thread.currentThread());
+            }
+            if (expected != current)
+                throw new IllegalStateException(String.format("Command store 
called from the wrong thread. Expected %s, got %s", expected, current));

Review Comment:
   nit: can we add a useful message? When something like this fails its too 
hard to figure out what's going on, so we should be nice and provide a useful 
error.



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,129 +18,117 @@
 
 package accord.local;
 
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.function.Consumer;
-
-import com.google.common.base.Preconditions;
-
 import accord.api.Key;
+import accord.api.Read;
 import accord.api.Result;
+import accord.api.Write;
 import accord.local.Node.Id;
-import accord.primitives.KeyRanges;
-import accord.primitives.Ballot;
-import accord.primitives.Deps;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
+import accord.primitives.*;
 import accord.txn.Txn;
-import accord.primitives.TxnId;
 import accord.txn.Writes;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static accord.local.Status.*;
+import static accord.utils.Utils.listOf;
 
-import static accord.local.Status.Accepted;
-import static accord.local.Status.AcceptedInvalidate;
-import static accord.local.Status.Applied;
-import static accord.local.Status.Committed;
-import static accord.local.Status.Executed;
-import static accord.local.Status.Invalidated;
-import static accord.local.Status.NotWitnessed;
-import static accord.local.Status.PreAccepted;
-import static accord.local.Status.ReadyToExecute;
-
-// TODO: this needs to be backed by persistent storage
-public class Command implements Listener, Consumer<Listener>
+public abstract class Command implements Listener, Consumer<Listener>, 
TxnOperation

Review Comment:
   nit: given the refactor to make all state abstract wouldn't it be best if 
this became an Interface?



##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -812,18 +812,26 @@ public void onCallbackFailure(Id from, Throwable failure)
 
     public static class ApplyAndCheck extends Apply
     {
-        final Set<Id> notPersisted;
+        public final Set<Id> notPersisted;

Review Comment:
   For consistency shouldn't notPersisted be private with a getter? this patch 
switches many things to getters yet makes this public field access.



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStores.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.ProgressLog;
+import accord.local.CommandStore;
+import accord.local.CommandStores;
+import accord.local.Node;
+import accord.primitives.Keys;
+
+import java.util.function.Consumer;
+
+import static java.lang.Boolean.FALSE;
+
+public abstract class InMemoryCommandStores extends CommandStores
+{
+    public InMemoryCommandStores(int num, Node node, Agent agent, DataStore 
store,
+                                 ProgressLog.Factory progressLogFactory)
+    {
+        super(num, node, agent, store, progressLogFactory);
+    }
+
+    public static InMemoryCommandStores inMemory(Node node)
+    {
+        return (InMemoryCommandStores) node.commandStores();
+    }
+
+    public void forEachLocal(Consumer<? super CommandStore> forEach)

Review Comment:
   It would be nice to move "forEachLocal" to lower level... seems that we try 
to overload fold in cases where forEach would be simpler.



##########
accord-maelstrom/build.gradle:
##########
@@ -20,8 +20,13 @@ plugins {
     id 'accord.java-conventions'
 }
 
+compileJava {

Review Comment:
   can you move the 1.8 logic to 
buildSrc/src/main/groovy/accord.java-conventions.gradle



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)

Review Comment:
   Reply from Blake in JIRA
   
   No I don't think so, these 2 commands just return witnessed/committed 
commands belonging to the given epoch. These (and all InMemory* 
implementations) are only meant to be used in tests. These methods in 
particular are placeholders to range movements in the library can be validated 
and should be superseded by the work to "officially" support bootstrap, low 
bounds, etc.



##########
accord-core/src/main/java/accord/coordinate/Execute.java:
##########
@@ -70,6 +71,12 @@ private void start()
     {
         Set<Id> readSet = readTracker.computeMinimalReadSetAndMarkInflight();
         Commit.commitAndRead(node, topologies, txnId, txn, homeKey, executeAt, 
deps, readSet, this);
+        // skip straight to persistence if there's no read
+        if (txn.read().keys().isEmpty())

Review Comment:
   Reply from Blake in JIRA
   
   I've added some annotations based on how things are implemented now



##########
accord-core/src/main/java/accord/impl/InMemoryCommand.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.api.Result;
+import accord.local.*;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.txn.*;
+
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import static accord.local.Status.NotWitnessed;
+
+public class InMemoryCommand extends Command
+{
+    public final CommandStore commandStore;
+    private final TxnId txnId;
+
+    private Key homeKey, progressKey;
+    private Txn txn;
+    private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
+    private Timestamp executeAt;
+    private Deps deps = Deps.NONE;
+    private Writes writes;
+    private Result result;
+
+    private Status status = NotWitnessed;
+
+    private boolean isGloballyPersistent; // only set on home shard
+
+    private NavigableMap<TxnId, Command> waitingOnCommit;
+    private NavigableMap<TxnId, Command> waitingOnApply;
+
+    private final Listeners listeners = new Listeners();
+
+    public InMemoryCommand(CommandStore commandStore, TxnId txnId)
+    {
+        this.commandStore = commandStore;
+        this.txnId = txnId;
+    }
+
+    @Override
+    public boolean equals(Object o)

Review Comment:
   This was fixed



##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -356,42 +315,110 @@ public synchronized void shutdown()
                 commandStore.shutdown();
     }
 
-    protected abstract <S> void forEach(Select<S> select, S scope, long 
minEpoch, long maxEpoch, Consumer<? super CommandStore> forEach);
-    protected abstract <S, T> T mapReduce(Select<S> select, S scope, long 
minEpoch, long maxEpoch, Function<? super CommandStore, T> map, BiFunction<T, 
T, T> reduce);
+    private static <T> Fold<TxnOperation, Void, List<Future<T>>> 
mapReduceFold(Function<CommandStore, T> map)
+    {
+        return (store, op, i, t) -> { t.add(store.process(op, map)); return t; 
};
+    }
+
+    private static <T> T reduce(List<Future<T>> futures, BiFunction<T, T, T> 
reduce)
+    {
+        T result = null;
+        for (Future<T> future : futures)
+        {
+            try
+            {
+                T next = future.get();
+                result = result == null ? next : reduce.apply(result, next);
+            }
+            catch (InterruptedException e)
+            {
+                throw new UncheckedInterruptedException(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e.getCause());
+            }
+        }
+        return result;
+    }
+
+    private <F, T> T setup(F f, Fold<F, Void, List<Future<T>>> fold, 
BiFunction<T, T, T> reduce)
+    {
+        List<Future<T>> futures = foldl((s, i, mn, mx) -> s.all(), null, 
Long.MIN_VALUE, Long.MAX_VALUE, fold, f, null, ArrayList::new);
+        return reduce(futures, reduce);
+    }
+
+    private  <S, T> T mapReduce(TxnOperation operation, Select<S> select, S 
scope, long minEpoch, long maxEpoch, Fold<TxnOperation, Void, List<Future<T>>> 
fold, BiFunction<T, T, T> reduce)
+    {
+        List<Future<T>> futures = foldl(select, scope, minEpoch, maxEpoch, 
fold, operation, null, ArrayList::new);
+        if (futures == null)
+            return null;
+        return reduce(futures, reduce);
+    }
+
+    public <T> T mapReduce(TxnOperation operation, Key key, long minEpoch, 
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, ShardedRanges::shard, key, minEpoch, 
maxEpoch, mapReduceFold(map), reduce);
+    }
+
+    public <T> T mapReduce(TxnOperation operation, Key key, long epoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, key, epoch, epoch, map, reduce);
+    }
+
+    public <T> T mapReduceSince(TxnOperation operation, Key key, long epoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, key, epoch, Long.MAX_VALUE, map, reduce);
+    }
 
-    public void forEach(Consumer<CommandStore> forEach)
+    public <T> T mapReduce(TxnOperation operation, Keys keys, long minEpoch, 
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
     {
-        forEach((s, i, min, max) -> s.all(), null, 0, 0, forEach);
+        // probably need to split txnOperation and scope stuff here
+        return mapReduce(operation, ShardedRanges::shards, keys, minEpoch, 
maxEpoch, mapReduceFold(map), reduce);
     }
 
-    public void forEach(Keys keys, long epoch, Consumer<CommandStore> forEach)
+    public void setup(Consumer<CommandStore> forEach)
     {
-        forEach(keys, epoch, epoch, forEach);
+        setup(forEach, (store, f, i, t) -> { t.add(store.processSetup(f)); 
return t; }, (Void i1, Void i2) -> null);
     }
 
-    public void forEach(Keys keys, long minEpoch, long maxEpoch, 
Consumer<CommandStore> forEach)
+    public <T> T setup(Function<CommandStore, T> map, BiFunction<T, T, T> 
reduce)
     {
-        forEach(ShardedRanges::shards, keys, minEpoch, maxEpoch, forEach);
+        return setup(map, (store, f, i, t) -> { t.add(store.processSetup(f)); 
return t; }, reduce);
     }
 
-    public <T> T mapReduce(Keys keys, long epoch, Function<CommandStore, T> 
map, BiFunction<T, T, T> reduce)
+    private static Fold<TxnOperation, Void, List<Future<Void>>> 
forEachFold(Consumer<CommandStore> forEach)
     {
-        return mapReduce(keys, epoch, epoch, map, reduce);
+        return (store, op, i, t) -> { t.add(store.process(op, forEach)); 
return t; };
     }
 
-    public <T> T mapReduce(Keys keys, long minEpoch, long maxEpoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    public void forEach(TxnOperation operation, Keys keys, long minEpoch, long 
maxEpoch, Consumer<CommandStore> forEach)

Review Comment:
   Reply from Blake in JIRA
   
   The keys in each perform different functions. The keys in the TxnOperation 
indicate which CommandsForKey we need access to, this let's C* know which ones 
it needs to load. The Keys passed into forEach determine which command stores 
we need to talk to, since we need to talk to "replicas" of the given keys, 
whether we need the commands for key to be in memory or not. Of course, this 
isn't obvious without more complete comments on TxnOperation
   
   > This change trickled throughout a lot of different places, so would be 
good to grasp the expected semantics... if the TxnOperation has keys, and you 
provide keys... "should" they be released or "must" they be related? or 
something else?
   
   I added some comments. The name TxnOperation is a little unclear. Let me 
know if you have any suggestions for better names. Maybe TxnOperationScope? 
Maybe it could extend Function<CommandStore, T> and bundle the operation and 
the metadata into the same interface? Calls would be a bit more cumbersome, but 
would make the interdependency of the 3 parts more obvious



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+
+                Collection<Command> committed = commands.committedByExecuteAt()
+                        .between(minTimestamp, 
maxTimestamp).collect(Collectors.toList());
+                committed.forEach(consumer);
+            }
+        }
+    }
+
+    protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)
+    {
+        processInternal(cs -> {
+            consumer.accept(cs);
+            return null;
+        }, promise);
+    }
+
+    protected <T> void processInternal(Function<? super CommandStore, T> 
function, Promise<T> promise)
+    {
+        try
+        {
+            T result = function.apply(this);
+            promise.setSuccess(result);
+        }
+        catch (Throwable e)
+        {
+            promise.tryFailure(e);
+        }
+    }
+
+    public static class Synchronized extends InMemoryCommandStore

Review Comment:
   resolved



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -274,99 +289,110 @@ public boolean commit(Txn txn, Key homeKey, Key 
progressKey, Timestamp executeAt
             });
         }
 
-        if (waitingOnCommit.isEmpty())
-        {
-            waitingOnCommit = null;
-            if (waitingOnApply.isEmpty())
-                waitingOnApply = null;
-        }
 
-        boolean isProgressShard = progressKey != null && handles(txnId.epoch, 
progressKey);
-        commandStore.progressLog().commit(txnId, isProgressShard, 
isProgressShard && progressKey.equals(homeKey));
+        boolean isProgressShard = progressKey != null && 
handles(txnId().epoch, progressKey);
+        commandStore().progressLog().commit(txnId(), isProgressShard, 
isProgressShard && progressKey.equals(homeKey));
 
-        maybeExecute(false);
-        listeners.forEach(this);
         return true;
     }
 
+    public Future<Void> commitAndBeginExecution(Txn txn, Key homeKey, Key 
progressKey, Timestamp executeAt, Deps deps)
+    {
+        if (!commit(txn, homeKey, progressKey, executeAt, deps))
+            return Write.SUCCESS;
+
+        return maybeExecute(true);
+    }
+
+    // TODO (now): commitInvalidate may need to update cfks _if_ possible
     public boolean commitInvalidate()
     {
         if (hasBeen(Committed))
         {
+            logger.trace("{}: skipping commit invalidated - already committed 
({})", txnId(), status());
             if (!hasBeen(Invalidated))
-                commandStore.agent().onInconsistentTimestamp(this, 
Timestamp.NONE, executeAt);
+                commandStore().agent().onInconsistentTimestamp(this, 
Timestamp.NONE, executeAt());
 
             return false;
         }
 
-        status = Invalidated;
+        status(Invalidated);
 
-        boolean isProgressShard = progressKey != null && handles(txnId.epoch, 
progressKey);
-        commandStore.progressLog().invalidate(txnId, isProgressShard, 
isProgressShard && progressKey.equals(homeKey));
+        boolean isProgressShard = progressKey() != null && 
handles(txnId().epoch, progressKey());
+        commandStore().progressLog().invalidate(txnId(), isProgressShard, 
isProgressShard && progressKey().equals(homeKey()));
+        logger.trace("{}: committed invalidated", txnId());
 
-        listeners.forEach(this);
+        notifyListeners();
         return true;
     }
 
-    public boolean apply(Txn txn, Key homeKey, Key progressKey, Timestamp 
executeAt, Deps deps, Writes writes, Result result)
+    public Future<Void> apply(Txn txn, Key homeKey, Key progressKey, Timestamp 
executeAt, Deps deps, Writes writes, Result result)

Review Comment:
   apply now returns a Future but some code paths ignore the output, so if 
apply fails or is async, there may be unexpected results? Examples: 
accord.coordinate.CheckOnCommitted#onSuccessCriteriaOrExhaustion(accord.messages.CheckStatus.CheckStatusOkFull),
 and accord.impl.SimpleProgressLog.ApplyAndCheck#process
   
   
   If apply is still being "applied" then the call to "commit" may cause a race 
condition as it could be called while the writes are happening, leading to the 
following timeline:
   
   ```
   T1 apply -> sets status to "Executed", send writes to be written in the 
background
   T2 commit -> set status to "Committed"
   T3 apply complete, call "postApply" -> set status "Applied"
   ```
   
   specifically this comment applies to 
"accord.coordinate.CheckOnCommitted#onSuccessCriteriaOrExhaustion(accord.messages.CheckStatus.CheckStatusOkFull)"...
 I might not be grasping the "epoch - 1" part



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,129 +18,117 @@
 
 package accord.local;
 
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.function.Consumer;
-
-import com.google.common.base.Preconditions;
-
 import accord.api.Key;
+import accord.api.Read;
 import accord.api.Result;
+import accord.api.Write;
 import accord.local.Node.Id;
-import accord.primitives.KeyRanges;
-import accord.primitives.Ballot;
-import accord.primitives.Deps;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
+import accord.primitives.*;
 import accord.txn.Txn;
-import accord.primitives.TxnId;
 import accord.txn.Writes;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static accord.local.Status.*;
+import static accord.utils.Utils.listOf;
 
-import static accord.local.Status.Accepted;
-import static accord.local.Status.AcceptedInvalidate;
-import static accord.local.Status.Applied;
-import static accord.local.Status.Committed;
-import static accord.local.Status.Executed;
-import static accord.local.Status.Invalidated;
-import static accord.local.Status.NotWitnessed;
-import static accord.local.Status.PreAccepted;
-import static accord.local.Status.ReadyToExecute;
-
-// TODO: this needs to be backed by persistent storage
-public class Command implements Listener, Consumer<Listener>
+public abstract class Command implements Listener, Consumer<Listener>, 
TxnOperation

Review Comment:
   Reply from Blake in JIRA
   
   I don't think so. There's a lot of functionality in there, some of which 
needs to be overridden in the C* implementation



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+
+                Collection<Command> committed = commands.committedByExecuteAt()
+                        .between(minTimestamp, 
maxTimestamp).collect(Collectors.toList());
+                committed.forEach(consumer);
+            }
+        }
+    }
+
+    protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)

Review Comment:
   resolved



##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -812,18 +812,26 @@ public void onCallbackFailure(Id from, Throwable failure)
 
     public static class ApplyAndCheck extends Apply
     {
-        final Set<Id> notPersisted;
+        public final Set<Id> notPersisted;

Review Comment:
   Reply from Blake in JIRA
   
   The integration of the progress log hasn't started yet. I'd expect the 
interface and reference implementation to be refined as that happens, but the 
objective for this patch to do the bare minimum to prevent build issues and 
test failures.



##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new 
TreeMap<>();
+
+    public static InMemoryCommandStore inMemory(CommandStore commandStore)
+    {
+        return (InMemoryCommandStore) commandStore;
+    }
+
+    public InMemoryCommandStore(int generation, int index, int numShards, 
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch 
rangesForEpoch)
+    {
+        super(generation, index, numShards, uniqueNow, currentEpoch, agent, 
store, progressLogFactory, rangesForEpoch);
+    }
+
+    @Override
+    public Command ifPresent(TxnId txnId)
+    {
+        return commands.get(txnId);
+    }
+
+    @Override
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this, 
id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    @Override
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, 
InMemoryCommandsForKey::new);
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    @Override
+    public CommandsForKey maybeCommandsForKey(Key key)
+    {
+        return commandsForKey.get(key);
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forEpochCommands(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+                commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+            }
+        }
+    }
+
+    // TODO: command store api will need to support something like this for 
repair/streaming
+    public void forCommittedInEpoch(KeyRanges ranges, long epoch, 
Consumer<Command> consumer)
+    {
+        Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, 
Integer.MIN_VALUE, Node.Id.NONE);
+        Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE, 
Integer.MAX_VALUE, Node.Id.MAX);
+        for (KeyRange range : ranges)
+        {
+            Iterable<CommandsForKey> rangeCommands = 
commandsForKey.subMap(range.start(),
+                                                                           
range.startInclusive(),
+                                                                           
range.end(),
+                                                                           
range.endInclusive()).values();
+            for (CommandsForKey commands : rangeCommands)
+            {
+
+                Collection<Command> committed = commands.committedByExecuteAt()
+                        .between(minTimestamp, 
maxTimestamp).collect(Collectors.toList());
+                committed.forEach(consumer);
+            }
+        }
+    }
+
+    protected void processInternal(Consumer<? super CommandStore> consumer, 
Promise<Void> promise)
+    {
+        processInternal(cs -> {
+            consumer.accept(cs);
+            return null;
+        }, promise);
+    }
+
+    protected <T> void processInternal(Function<? super CommandStore, T> 
function, Promise<T> promise)
+    {
+        try
+        {
+            T result = function.apply(this);
+            promise.setSuccess(result);
+        }
+        catch (Throwable e)
+        {
+            promise.tryFailure(e);
+        }
+    }
+
+    public static class Synchronized extends InMemoryCommandStore
+    {
+        public Synchronized(int generation,
+                            int index,
+                            int numShards,
+                            Function<Timestamp, Timestamp> uniqueNow,
+                            LongSupplier currentEpoch,
+                            Agent agent,
+                            DataStore store,
+                            ProgressLog.Factory progressLogFactory,
+                            RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, uniqueNow, currentEpoch, 
agent, store, progressLogFactory, rangesForEpoch);
+        }
+
+        @Override
+        public synchronized Future<Void> processSetup(Consumer<? super 
CommandStore> function)
+        {
+            AsyncPromise<Void> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized <T> Future<T> processSetup(Function<? super 
CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized Future<Void> process(TxnOperation unused, 
Consumer<? super CommandStore> consumer)
+        {
+            Promise<Void> promise = new AsyncPromise<>();
+            processInternal(consumer, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized <T> Future<T> process(TxnOperation unused, 
Function<? super CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
+        @Override
+        public synchronized void shutdown() {}
+    }
+
+    public static class SingleThread extends InMemoryCommandStore
+    {
+        private final ExecutorService executor;
+
+        private class ConsumerWrapper extends AsyncPromise<Void> implements 
Runnable
+        {
+            private final Consumer<? super CommandStore> consumer;
+
+            public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+            {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(consumer, this);
+            }
+        }
+
+        private class FunctionWrapper<T> extends AsyncPromise<T> implements 
Runnable
+        {
+            private final Function<? super CommandStore, T> function;
+
+            public FunctionWrapper(Function<? super CommandStore, T> function)
+            {
+                this.function = function;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(function, this);
+            }
+        }
+
+        public SingleThread(int generation,
+                            int index,
+                            int numShards,
+                            Node.Id nodeId,
+                            Function<Timestamp, Timestamp> uniqueNow,
+                            LongSupplier currentEpoch,
+                            Agent agent,
+                            DataStore store,
+                            ProgressLog.Factory progressLogFactory,
+                            RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, uniqueNow, currentEpoch, 
agent, store, progressLogFactory, rangesForEpoch);
+            executor = Executors.newSingleThreadExecutor(r -> {
+                Thread thread = new Thread(r);
+                thread.setName(CommandStore.class.getSimpleName() + '[' + 
nodeId + ':' + index + ']');
+                return thread;
+            });
+        }
+
+        @Override
+        public Future<Void> processSetup(Consumer<? super CommandStore> 
function)
+        {
+            ConsumerWrapper future = new ConsumerWrapper(function);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public <T> Future<T> processSetup(Function<? super CommandStore, T> 
function)
+        {
+            FunctionWrapper<T> future = new FunctionWrapper<>(function);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public Future<Void> process(TxnOperation unused, Consumer<? super 
CommandStore> consumer)
+        {
+            ConsumerWrapper future = new ConsumerWrapper(consumer);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public <T> Future<T> process(TxnOperation unused, Function<? super 
CommandStore, T> function)
+        {
+            FunctionWrapper<T> future = new FunctionWrapper<>(function);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public void shutdown()
+        {
+            executor.shutdown();
+        }
+    }
+
+    public static class SingleThreadDebug extends SingleThread
+    {
+        private final AtomicReference<Thread> expectedThread = new 
AtomicReference<>();
+
+        public SingleThreadDebug(int generation,
+                                 int index,
+                                 int numShards,
+                                 Node.Id nodeId,
+                                 Function<Timestamp, Timestamp> uniqueNow,
+                                 LongSupplier currentEpoch,
+                                 Agent agent,
+                                 DataStore store,
+                                 ProgressLog.Factory progressLogFactory,
+                                 RangesForEpoch rangesForEpoch)
+        {
+            super(generation, index, numShards, nodeId, uniqueNow, 
currentEpoch, agent, store, progressLogFactory, rangesForEpoch);
+        }
+
+        private void assertThread()
+        {
+            Thread current = Thread.currentThread();
+            Thread expected;
+            while (true)
+            {
+                expected = expectedThread.get();
+                if (expected != null)
+                    break;
+                expectedThread.compareAndSet(null, Thread.currentThread());
+            }
+            if (expected != current)
+                throw new IllegalStateException(String.format("Command store 
called from the wrong thread. Expected %s, got %s", expected, current));

Review Comment:
   Resolved



##########
accord-core/src/main/java/accord/messages/Apply.java:
##########
@@ -54,15 +63,59 @@ public Apply(Node.Id to, Topologies topologies, TxnId 
txnId, Txn txn, Key homeKe
         this.result = result;
     }
 
+    @VisibleForImplementation
+    public Apply(Keys scope, long waitForEpoch, TxnId txnId, Txn txn, Key 
homeKey, Timestamp executeAt, Deps deps, Writes writes, Result result)
+    {
+        super(scope, waitForEpoch);
+        this.txnId = txnId;
+        this.txn = txn;
+        this.homeKey = homeKey;
+        this.executeAt = executeAt;
+        this.deps = deps;
+        this.writes = writes;
+        this.result = result;
+    }
+
+    static Future<Void> waitAndReduce(Future<Void> left, Future<Void> right)

Review Comment:
   Reply from Blake in JIRA
   
   Good point. Changed return type to Future<Void>



##########
accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandsForKey;
+import accord.primitives.Timestamp;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Stream;
+
+public class InMemoryCommandsForKey extends CommandsForKey
+{
+    static class InMemoryCommandTimeseries implements CommandTimeseries
+    {
+        private final NavigableMap<Timestamp, Command> commands = new 
TreeMap<>();
+
+        @Override
+        public Command get(Timestamp timestamp)
+        {
+            return commands.get(timestamp);
+        }
+
+        @Override
+        public void add(Timestamp timestamp, Command command)

Review Comment:
   Resolved



##########
accord-core/src/main/java/accord/messages/Apply.java:
##########
@@ -54,15 +63,59 @@ public Apply(Node.Id to, Topologies topologies, TxnId 
txnId, Txn txn, Key homeKe
         this.result = result;
     }
 
+    @VisibleForImplementation
+    public Apply(Keys scope, long waitForEpoch, TxnId txnId, Txn txn, Key 
homeKey, Timestamp executeAt, Deps deps, Writes writes, Result result)
+    {
+        super(scope, waitForEpoch);
+        this.txnId = txnId;
+        this.txn = txn;
+        this.homeKey = homeKey;
+        this.executeAt = executeAt;
+        this.deps = deps;
+        this.writes = writes;
+        this.result = result;
+    }
+
+    static Future<Void> waitAndReduce(Future<Void> left, Future<Void> right)

Review Comment:
   nit: "waitAndReduce" throws on future error rather than return a failed 
future, this feels off to me but looks to be because "mapReduceLocal" doesn't 
allow changing the return type; we want Void in this context but that API 
doesn't allow... would it be better to allow a different return type? Caller 
never checks return so Future<?> is brittle...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to