dcapwell commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1103126636


##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
+import accord.local.Command;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.NodeTimeService;
+import accord.local.PreExecuteContext;
+import accord.local.SafeCommandStores;
+import accord.local.Status;
+import accord.primitives.AbstractKeys;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
+
+public class AccordSafeCommandStore extends 
SafeCommandStores.AbstractSafeCommandStore
+{
+    private final AccordCommandStore commandStore;
+
+    public AccordSafeCommandStore(PreExecuteContext context, 
AccordCommandStore commandStore)
+    {
+        super(context);
+        this.commandStore = commandStore;
+    }
+
+    @Override
+    protected Command getIfLoaded(TxnId txnId)
+    {
+        return commandStore.commandCache().referenceAndGetIfLoaded(txnId);
+    }
+
+    @Override
+    protected CommandsForKey getIfLoaded(RoutableKey key)
+    {
+        return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key);
+    }
+
+    @Override
+    public AccordCommandStore commandStore()
+    {
+        return commandStore;
+    }
+
+    @Override
+    public DataStore dataStore()
+    {
+        return commandStore().dataStore();
+    }
+
+    @Override
+    public Agent agent()
+    {
+        return commandStore.agent();
+    }
+
+    @Override
+    public ProgressLog progressLog()
+    {
+        return commandStore().progressLog();
+    }
+
+    @Override
+    public NodeTimeService time()
+    {
+        return commandStore.time();
+    }
+
+    @Override
+    public RangesForEpoch ranges()
+    {
+        return commandStore().ranges();
+    }
+
+    @Override
+    public long latestEpoch()
+    {
+        return commandStore().time().epoch();
+    }
+
+    @Override
+    protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)

Review Comment:
   I question if this should be in C*... I feel this really should be in accord 
and we shouldn't have to define it unless C* has a different logic (which we 
don't as far as I see)



##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
+import accord.local.Command;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.NodeTimeService;
+import accord.local.PreExecuteContext;
+import accord.local.SafeCommandStores;
+import accord.local.Status;
+import accord.primitives.AbstractKeys;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
+
+public class AccordSafeCommandStore extends 
SafeCommandStores.AbstractSafeCommandStore
+{
+    private final AccordCommandStore commandStore;
+
+    public AccordSafeCommandStore(PreExecuteContext context, 
AccordCommandStore commandStore)
+    {
+        super(context);
+        this.commandStore = commandStore;
+    }
+
+    @Override
+    protected Command getIfLoaded(TxnId txnId)
+    {
+        return commandStore.commandCache().referenceAndGetIfLoaded(txnId);
+    }
+
+    @Override
+    protected CommandsForKey getIfLoaded(RoutableKey key)
+    {
+        return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key);
+    }
+
+    @Override
+    public AccordCommandStore commandStore()
+    {
+        return commandStore;
+    }
+
+    @Override
+    public DataStore dataStore()
+    {
+        return commandStore().dataStore();
+    }
+
+    @Override
+    public Agent agent()
+    {
+        return commandStore.agent();
+    }
+
+    @Override
+    public ProgressLog progressLog()
+    {
+        return commandStore().progressLog();
+    }
+
+    @Override
+    public NodeTimeService time()
+    {
+        return commandStore.time();
+    }
+
+    @Override
+    public RangesForEpoch ranges()
+    {
+        return commandStore().ranges();
+    }
+
+    @Override
+    public long latestEpoch()
+    {
+        return commandStore().time().epoch();
+    }
+
+    @Override
+    protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
+    {
+        // TODO: Seekables
+        // TODO: efficiency
+        return ((Keys)keysOrRanges).stream()
+                           .map(this::maybeCommandsForKey)
+                           .filter(Objects::nonNull)
+                           .map(CommandsForKey::max)
+                           .max(Comparator.naturalOrder())
+                           .orElse(Timestamp.NONE);
+    }
+
+    public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+    {
+        switch (keysOrRanges.domain()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                return keys.stream()
+                           .map(this::commandsForKey)
+                           .map(map)
+                           .reduce(initialValue, reduce);
+            case Range:
+                // TODO: implement
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)

Review Comment:
   personal preference: functions should be at the end of a method for 
readability, we are not consistent and know the JDK doesn't always do this, but 
I personally find the code cleaner to read.
   
   The only caller splits the inputs before/after the lambda which I find is 
harder to read than 
   
   ```
   return mapReduceForKey(keysOrRanges, slice, accumulate, terminalValue, 
(forKey, prev) -> {
   ...
   });
   ```



##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +59,94 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);
+        this.keys = Lists.newArrayList(keys);
     }
 
-    private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-                                                                         
AccordStateCache.Instance<K, V> cache,
-                                                                         
Map<K, V> context,
-                                                                         
Function<V, Future<?>> readFunction,
-                                                                         
Object callback)
-    {
-        V item;
-        Future<?> future = cache.getLoadFuture(key);
-        if (future != null)
-        {
-            // if a load future exists for this, it must be present in the 
cache
-            item = cache.getOrNull(key);
-            Preconditions.checkState(item != null);
-            context.put(key, item);
-            if (logger.isTraceEnabled())
-                logger.trace("Existing load future found for {} while loading 
for {}. ({})", item.key(), callback, item);
-            return future;
-        }
-
-        item = cache.getOrCreate(key);
-        context.put(key, item);
-        if (item.isLoaded())
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Cached item found for {} while loading for {}. 
({})", item.key(), callback, item);
-            return null;
-        }
-
-        future = readFunction.apply(item);
-        cache.setLoadFuture(item.key(), future);
-        if (logger.isTraceEnabled())
-            logger.trace("Loading new item for {} while loading for {}. ({})", 
item.key(), callback, item);
-        return future;
-    }
-
-
-    private <K, V extends AccordState<K>> List<Future<?>> 
referenceAndDispatchReads(Iterable<K> keys,
+    private <K, V extends ImmutableState> List<AsyncChain<Void>> 
referenceAndDispatchReads(Iterable<K> keys,
                                                                                
            AccordStateCache.Instance<K, V> cache,
-                                                                               
            Map<K, V> context,
-                                                                               
            Function<V, Future<?>> readFunction,
-                                                                               
            List<Future<?>> futures,
+                                                                               
            LoadFunction<K, V> loadFunction,
+                                                                               
            List<AsyncChain<Void>> results,
                                                                                
            Object callback)
     {
         for (K key : keys)
         {
-            Future<?> future = referenceAndDispatch(key, cache, context, 
readFunction, callback);
-            if (future == null)
+            AsyncResult<Void> result = cache.referenceAndLoad(key, 
loadFunction);
+            if (result == null)
                 continue;
 
-            if (futures == null)
-                futures = new ArrayList<>();
+            if (results == null)
+                results = new ArrayList<>();
 
-            futures.add(future);
+            results.add(result);
         }
 
-        return futures;
+        return results;
     }
 
     @VisibleForTesting
-    Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)

Review Comment:
   I question if we actually want to have this `callback` and log it... at the 
moment there is a single caller
   
   ```
   loader.load(this::callback)
   ```
   
   so our logs would just say a lambda exists in 
`org.apache.cassandra.service.accord.async.AsyncOperation`, which I don't feel 
is that helpful.
   
   It also adds confusion to readers as we are making a async chain that takes 
a callback, so why are we ignoring this callback?
   
   Removing the callback also means we don't allocate a new lambda for every 
request, so can save the allocation and keep reusing.



##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +59,94 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);
+        this.keys = Lists.newArrayList(keys);
     }
 
-    private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-                                                                         
AccordStateCache.Instance<K, V> cache,
-                                                                         
Map<K, V> context,
-                                                                         
Function<V, Future<?>> readFunction,
-                                                                         
Object callback)
-    {
-        V item;
-        Future<?> future = cache.getLoadFuture(key);
-        if (future != null)
-        {
-            // if a load future exists for this, it must be present in the 
cache
-            item = cache.getOrNull(key);
-            Preconditions.checkState(item != null);
-            context.put(key, item);
-            if (logger.isTraceEnabled())
-                logger.trace("Existing load future found for {} while loading 
for {}. ({})", item.key(), callback, item);
-            return future;
-        }
-
-        item = cache.getOrCreate(key);
-        context.put(key, item);
-        if (item.isLoaded())
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Cached item found for {} while loading for {}. 
({})", item.key(), callback, item);
-            return null;
-        }
-
-        future = readFunction.apply(item);
-        cache.setLoadFuture(item.key(), future);
-        if (logger.isTraceEnabled())
-            logger.trace("Loading new item for {} while loading for {}. ({})", 
item.key(), callback, item);
-        return future;
-    }
-
-
-    private <K, V extends AccordState<K>> List<Future<?>> 
referenceAndDispatchReads(Iterable<K> keys,
+    private <K, V extends ImmutableState> List<AsyncChain<Void>> 
referenceAndDispatchReads(Iterable<K> keys,
                                                                                
            AccordStateCache.Instance<K, V> cache,
-                                                                               
            Map<K, V> context,
-                                                                               
            Function<V, Future<?>> readFunction,
-                                                                               
            List<Future<?>> futures,
+                                                                               
            LoadFunction<K, V> loadFunction,
+                                                                               
            List<AsyncChain<Void>> results,
                                                                                
            Object callback)
     {
         for (K key : keys)
         {
-            Future<?> future = referenceAndDispatch(key, cache, context, 
readFunction, callback);
-            if (future == null)
+            AsyncResult<Void> result = cache.referenceAndLoad(key, 
loadFunction);
+            if (result == null)
                 continue;
 
-            if (futures == null)
-                futures = new ArrayList<>();
+            if (results == null)
+                results = new ArrayList<>();
 
-            futures.add(future);
+            results.add(result);
         }
 
-        return futures;
+        return results;
     }
 
     @VisibleForTesting
-    Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)
+    LoadFunction<TxnId, Command> loadCommandFunction(Object callback)
     {
-        return command -> Stage.READ.submit(() -> {
+        return (txnId, consumer) -> ofRunnable(Stage.READ.executor(), () -> {
             try
             {
-                logger.trace("Starting load of {} for {}", command.txnId(), 
callback);
-                AccordKeyspace.loadCommand(commandStore, command);
-                logger.trace("Completed load of {} for {}", command.txnId(), 
callback);
+                logger.trace("Starting load of {} for {}", txnId, callback);
+                Command command = AccordKeyspace.loadCommand(commandStore, 
txnId);
+                logger.trace("Completed load of {} for {}", txnId, callback);
+                consumer.accept(command);
             }
             catch (Throwable t)
             {
-                logger.error("Exception loading {} for {}", command.txnId(), 
callback, t);
+                logger.error("Exception loading {} for {}", txnId, callback, 
t);
                 throw t;
             }
         });
     }
 
     @VisibleForTesting
-    Function<AccordCommandsForKey, Future<?>> 
loadCommandsPerKeyFunction(Object callback)
+    LoadFunction<RoutableKey, CommandsForKey> 
loadCommandsPerKeyFunction(Object callback)

Review Comment:
   same as above comment, should we remove `callback`?  we currently only use 
it for logging and all it does is say there exists a lambda from 
AsyncOperation, so its not something we really can use for debugging.
   
   Removing also means we can avoid creating a new lambda for each request and 
reuse the same lambda



##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
+import accord.local.Command;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.NodeTimeService;
+import accord.local.PreExecuteContext;
+import accord.local.SafeCommandStores;
+import accord.local.Status;
+import accord.primitives.AbstractKeys;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
+
+public class AccordSafeCommandStore extends 
SafeCommandStores.AbstractSafeCommandStore
+{
+    private final AccordCommandStore commandStore;
+
+    public AccordSafeCommandStore(PreExecuteContext context, 
AccordCommandStore commandStore)
+    {
+        super(context);
+        this.commandStore = commandStore;
+    }
+
+    @Override
+    protected Command getIfLoaded(TxnId txnId)
+    {
+        return commandStore.commandCache().referenceAndGetIfLoaded(txnId);
+    }
+
+    @Override
+    protected CommandsForKey getIfLoaded(RoutableKey key)
+    {
+        return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key);
+    }
+
+    @Override
+    public AccordCommandStore commandStore()
+    {
+        return commandStore;
+    }
+
+    @Override
+    public DataStore dataStore()
+    {
+        return commandStore().dataStore();
+    }
+
+    @Override
+    public Agent agent()
+    {
+        return commandStore.agent();
+    }
+
+    @Override
+    public ProgressLog progressLog()
+    {
+        return commandStore().progressLog();
+    }
+
+    @Override
+    public NodeTimeService time()
+    {
+        return commandStore.time();
+    }
+
+    @Override
+    public RangesForEpoch ranges()
+    {
+        return commandStore().ranges();
+    }
+
+    @Override
+    public long latestEpoch()
+    {
+        return commandStore().time().epoch();
+    }
+
+    @Override
+    protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
+    {
+        // TODO: Seekables
+        // TODO: efficiency
+        return ((Keys)keysOrRanges).stream()
+                           .map(this::maybeCommandsForKey)
+                           .filter(Objects::nonNull)
+                           .map(CommandsForKey::max)
+                           .max(Comparator.naturalOrder())
+                           .orElse(Timestamp.NONE);
+    }
+
+    public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)

Review Comment:
   feel this should be in accord and not C*, this should be generic



##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
+import accord.local.Command;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.NodeTimeService;
+import accord.local.PreExecuteContext;
+import accord.local.SafeCommandStores;
+import accord.local.Status;
+import accord.primitives.AbstractKeys;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
+
+public class AccordSafeCommandStore extends 
SafeCommandStores.AbstractSafeCommandStore
+{
+    private final AccordCommandStore commandStore;
+
+    public AccordSafeCommandStore(PreExecuteContext context, 
AccordCommandStore commandStore)
+    {
+        super(context);
+        this.commandStore = commandStore;
+    }
+
+    @Override
+    protected Command getIfLoaded(TxnId txnId)
+    {
+        return commandStore.commandCache().referenceAndGetIfLoaded(txnId);
+    }
+
+    @Override
+    protected CommandsForKey getIfLoaded(RoutableKey key)
+    {
+        return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key);
+    }
+
+    @Override
+    public AccordCommandStore commandStore()
+    {
+        return commandStore;
+    }
+
+    @Override
+    public DataStore dataStore()
+    {
+        return commandStore().dataStore();
+    }
+
+    @Override
+    public Agent agent()
+    {
+        return commandStore.agent();
+    }
+
+    @Override
+    public ProgressLog progressLog()
+    {
+        return commandStore().progressLog();
+    }
+
+    @Override
+    public NodeTimeService time()
+    {
+        return commandStore.time();
+    }
+
+    @Override
+    public RangesForEpoch ranges()
+    {
+        return commandStore().ranges();
+    }
+
+    @Override
+    public long latestEpoch()
+    {
+        return commandStore().time().epoch();
+    }
+
+    @Override
+    protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
+    {
+        // TODO: Seekables
+        // TODO: efficiency
+        return ((Keys)keysOrRanges).stream()
+                           .map(this::maybeCommandsForKey)
+                           .filter(Objects::nonNull)
+                           .map(CommandsForKey::max)
+                           .max(Comparator.naturalOrder())
+                           .orElse(Timestamp.NONE);
+    }
+
+    public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+    {
+        switch (keysOrRanges.domain()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                return keys.stream()
+                           .map(this::commandsForKey)
+                           .map(map)
+                           .reduce(initialValue, reduce);
+            case Range:
+                // TODO: implement
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+    {
+        switch (keysOrRanges.domain()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                // TODO: efficiency
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                for (Key key : keys)
+                {
+                    if (!slice.contains(key)) continue;
+                    CommandsForKey forKey = commandsForKey(key);
+                    accumulate = map.apply(forKey, accumulate);
+                    if (accumulate.equals(terminalValue))
+                        return accumulate;
+                }
+                break;
+            case Range:
+                // TODO (required): implement
+                throw new UnsupportedOperationException();
+        }
+        return accumulate;
+    }
+
+    @Override
+    public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, 
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep 
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status 
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)

Review Comment:
   similar feedback to the other methods, do we need this to live in C*?  I 
don't see anything specific to us, so feel this should be pushed to accord.



##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
+import accord.local.Command;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.NodeTimeService;
+import accord.local.PreExecuteContext;
+import accord.local.SafeCommandStores;
+import accord.local.Status;
+import accord.primitives.AbstractKeys;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
+
+public class AccordSafeCommandStore extends 
SafeCommandStores.AbstractSafeCommandStore
+{
+    private final AccordCommandStore commandStore;
+
+    public AccordSafeCommandStore(PreExecuteContext context, 
AccordCommandStore commandStore)
+    {
+        super(context);
+        this.commandStore = commandStore;
+    }
+
+    @Override
+    protected Command getIfLoaded(TxnId txnId)
+    {
+        return commandStore.commandCache().referenceAndGetIfLoaded(txnId);
+    }
+
+    @Override
+    protected CommandsForKey getIfLoaded(RoutableKey key)
+    {
+        return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key);
+    }
+
+    @Override
+    public AccordCommandStore commandStore()
+    {
+        return commandStore;
+    }
+
+    @Override
+    public DataStore dataStore()
+    {
+        return commandStore().dataStore();
+    }
+
+    @Override
+    public Agent agent()
+    {
+        return commandStore.agent();
+    }
+
+    @Override
+    public ProgressLog progressLog()
+    {
+        return commandStore().progressLog();
+    }
+
+    @Override
+    public NodeTimeService time()
+    {
+        return commandStore.time();
+    }
+
+    @Override
+    public RangesForEpoch ranges()
+    {
+        return commandStore().ranges();
+    }
+
+    @Override
+    public long latestEpoch()
+    {
+        return commandStore().time().epoch();
+    }
+
+    @Override
+    protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
+    {
+        // TODO: Seekables
+        // TODO: efficiency
+        return ((Keys)keysOrRanges).stream()
+                           .map(this::maybeCommandsForKey)
+                           .filter(Objects::nonNull)
+                           .map(CommandsForKey::max)
+                           .max(Comparator.naturalOrder())
+                           .orElse(Timestamp.NONE);
+    }
+
+    public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+    {
+        switch (keysOrRanges.domain()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                return keys.stream()
+                           .map(this::commandsForKey)
+                           .map(map)
+                           .reduce(initialValue, reduce);
+            case Range:
+                // TODO: implement
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+    {
+        switch (keysOrRanges.domain()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                // TODO: efficiency
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                for (Key key : keys)
+                {
+                    if (!slice.contains(key)) continue;
+                    CommandsForKey forKey = commandsForKey(key);
+                    accumulate = map.apply(forKey, accumulate);
+                    if (accumulate.equals(terminalValue))
+                        return accumulate;
+                }
+                break;
+            case Range:
+                // TODO (required): implement
+                throw new UnsupportedOperationException();
+        }
+        return accumulate;
+    }
+
+    @Override
+    public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, 
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep 
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status 
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
+    {
+        accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> {
+            CommandsForKey.CommandTimeseries<?> timeseries;
+            switch (testTimestamp)
+            {
+                default: throw new AssertionError();
+                case STARTED_AFTER:
+                case STARTED_BEFORE:
+                    timeseries = forKey.byId();
+                    break;
+                case EXECUTES_AFTER:
+                case MAY_EXECUTE_BEFORE:
+                    timeseries = forKey.byExecuteAt();
+            }
+            CommandsForKey.CommandTimeseries.TestTimestamp remapTestTimestamp;
+            switch (testTimestamp)
+            {
+                default: throw new AssertionError();
+                case STARTED_AFTER:
+                case EXECUTES_AFTER:
+                    remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
+                    break;
+                case STARTED_BEFORE:
+                case MAY_EXECUTE_BEFORE:
+                    remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
+            }
+            return timeseries.mapReduce(testKind, remapTestTimestamp, 
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
+        }, accumulate, terminalValue);
+
+        return accumulate;

Review Comment:
   nit, rather than override `accumulate` can we do `return` instead?
   
   ```suggestion
   return mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> {
               CommandsForKey.CommandTimeseries<?> timeseries;
               switch (testTimestamp)
               {
                   default: throw new AssertionError();
                   case STARTED_AFTER:
                   case STARTED_BEFORE:
                       timeseries = forKey.byId();
                       break;
                   case EXECUTES_AFTER:
                   case MAY_EXECUTE_BEFORE:
                       timeseries = forKey.byExecuteAt();
               }
               CommandsForKey.CommandTimeseries.TestTimestamp 
remapTestTimestamp;
               switch (testTimestamp)
               {
                   default: throw new AssertionError();
                   case STARTED_AFTER:
                   case EXECUTES_AFTER:
                       remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
                       break;
                   case STARTED_BEFORE:
                   case MAY_EXECUTE_BEFORE:
                       remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
               }
               return timeseries.mapReduce(testKind, remapTestTimestamp, 
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
           }, accumulate, terminalValue);
   ```



##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
+import accord.local.Command;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.NodeTimeService;
+import accord.local.PreExecuteContext;
+import accord.local.SafeCommandStores;
+import accord.local.Status;
+import accord.primitives.AbstractKeys;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
+
+public class AccordSafeCommandStore extends 
SafeCommandStores.AbstractSafeCommandStore
+{
+    private final AccordCommandStore commandStore;
+
+    public AccordSafeCommandStore(PreExecuteContext context, 
AccordCommandStore commandStore)
+    {
+        super(context);
+        this.commandStore = commandStore;
+    }
+
+    @Override
+    protected Command getIfLoaded(TxnId txnId)
+    {
+        return commandStore.commandCache().referenceAndGetIfLoaded(txnId);
+    }
+
+    @Override
+    protected CommandsForKey getIfLoaded(RoutableKey key)
+    {
+        return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key);
+    }
+
+    @Override
+    public AccordCommandStore commandStore()
+    {
+        return commandStore;
+    }
+
+    @Override
+    public DataStore dataStore()
+    {
+        return commandStore().dataStore();
+    }
+
+    @Override
+    public Agent agent()
+    {
+        return commandStore.agent();
+    }
+
+    @Override
+    public ProgressLog progressLog()
+    {
+        return commandStore().progressLog();
+    }
+
+    @Override
+    public NodeTimeService time()
+    {
+        return commandStore.time();
+    }
+
+    @Override
+    public RangesForEpoch ranges()
+    {
+        return commandStore().ranges();
+    }
+
+    @Override
+    public long latestEpoch()
+    {
+        return commandStore().time().epoch();
+    }
+
+    @Override
+    protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
+    {
+        // TODO: Seekables
+        // TODO: efficiency
+        return ((Keys)keysOrRanges).stream()
+                           .map(this::maybeCommandsForKey)
+                           .filter(Objects::nonNull)
+                           .map(CommandsForKey::max)
+                           .max(Comparator.naturalOrder())
+                           .orElse(Timestamp.NONE);
+    }
+
+    public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+    {
+        switch (keysOrRanges.domain()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                return keys.stream()
+                           .map(this::commandsForKey)
+                           .map(map)
+                           .reduce(initialValue, reduce);
+            case Range:
+                // TODO: implement
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+    {
+        switch (keysOrRanges.domain()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                // TODO: efficiency
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                for (Key key : keys)
+                {
+                    if (!slice.contains(key)) continue;
+                    CommandsForKey forKey = commandsForKey(key);
+                    accumulate = map.apply(forKey, accumulate);
+                    if (accumulate.equals(terminalValue))
+                        return accumulate;
+                }
+                break;
+            case Range:
+                // TODO (required): implement
+                throw new UnsupportedOperationException();
+        }
+        return accumulate;
+    }
+
+    @Override
+    public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, 
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep 
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status 
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
+    {
+        accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> {
+            CommandsForKey.CommandTimeseries<?> timeseries;
+            switch (testTimestamp)
+            {
+                default: throw new AssertionError();
+                case STARTED_AFTER:
+                case STARTED_BEFORE:
+                    timeseries = forKey.byId();
+                    break;
+                case EXECUTES_AFTER:
+                case MAY_EXECUTE_BEFORE:
+                    timeseries = forKey.byExecuteAt();
+            }
+            CommandsForKey.CommandTimeseries.TestTimestamp remapTestTimestamp;
+            switch (testTimestamp)
+            {
+                default: throw new AssertionError();
+                case STARTED_AFTER:
+                case EXECUTES_AFTER:
+                    remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
+                    break;
+                case STARTED_BEFORE:
+                case MAY_EXECUTE_BEFORE:
+                    remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
+            }
+            return timeseries.mapReduce(testKind, remapTestTimestamp, 
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
+        }, accumulate, terminalValue);
+
+        return accumulate;
+    }
+
+    @Override
+    public void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command 
command)
+    {
+        // TODO (required): support ranges
+        Routables.foldl((Keys)keysOrRanges, slice, (k, v, i) -> { 
CommandsForKeys.register(this, command, k, slice); return v; }, null);
+    }
+
+    @Override
+    public void register(Seekable keyOrRange, Ranges slice, Command command)
+    {
+        // TODO (required): support ranges
+        Key key = (Key) keyOrRange;
+        if (slice.contains(key))
+            CommandsForKeys.register(this, command, key, slice);
+    }

Review Comment:
   can we push these to accord?



##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -51,71 +55,107 @@
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordStateCache.class);
 
-    private static class WriteOnlyGroup<K, V extends AccordState<K>>
+    public interface LoadFunction<K, V>
     {
-        private boolean locked = false;
-        private List<AccordState.WriteOnly<K, V>> items = new ArrayList<>();
+        AsyncResult<Void> apply(K key, Consumer<V> valueConsumer);

Review Comment:
   in the spirit of chains vs result, I think this should be
   
   ```
   AsyncChain<V> apply(K key);
   ```
   
   Looking at `PendingLoad` it looks like we are bad about handling the 
exception case...  For example look at 
`org.apache.cassandra.service.accord.AccordStateCache.Node#maybeFinishLoad` it 
just sets the `state = null` when we see an error!  This puts us in a confusing 
state where `isLoaded() == true`, but we are not loaded!  Which I guess is why 
`value()` has to have this check
   
   ```
   Invariants.checkState(isLoaded() && state != null);
   ```
   
   I tried working on a patch to clean this up and found issues with 
`AsyncResult` (need `getCauseNow`, `getNow` methods), and `AsyncResults` 
(`reduce` doesn't handle types properly, causing `List<AsyncChain<?>>` to not 
work)
   
   
   For `accord.utils.async.AsyncResults#reduce` it should be
   
   ```
   <T> AsyncChain<V> reduce(Collection<? extends AsyncChain<? extends V>> 
results, BinaryOperator<V> reducer)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to