belliottsmith commented on code in PR #10:
URL: https://github.com/apache/cassandra-accord/pull/10#discussion_r979237644


##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -356,42 +315,110 @@ public synchronized void shutdown()
                 commandStore.shutdown();
     }
 
-    protected abstract <S> void forEach(Select<S> select, S scope, long 
minEpoch, long maxEpoch, Consumer<? super CommandStore> forEach);
-    protected abstract <S, T> T mapReduce(Select<S> select, S scope, long 
minEpoch, long maxEpoch, Function<? super CommandStore, T> map, BiFunction<T, 
T, T> reduce);
+    private static <T> Fold<TxnOperation, Void, List<Future<T>>> 
mapReduceFold(Function<CommandStore, T> map)
+    {
+        return (store, op, i, t) -> { t.add(store.process(op, map)); return t; 
};
+    }
+
+    private static <T> T reduce(List<Future<T>> futures, BiFunction<T, T, T> 
reduce)
+    {
+        T result = null;
+        for (Future<T> future : futures)
+        {
+            try
+            {
+                T next = future.get();
+                result = result == null ? next : reduce.apply(result, next);
+            }
+            catch (InterruptedException e)
+            {
+                throw new UncheckedInterruptedException(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e.getCause());
+            }
+        }
+        return result;
+    }
+
+    private <F, T> T setup(F f, Fold<F, Void, List<Future<T>>> fold, 
BiFunction<T, T, T> reduce)
+    {
+        List<Future<T>> futures = foldl((s, i, mn, mx) -> s.all(), null, 
Long.MIN_VALUE, Long.MAX_VALUE, fold, f, null, ArrayList::new);
+        return reduce(futures, reduce);
+    }
+
+    private  <S, T> T mapReduce(TxnOperation operation, Select<S> select, S 
scope, long minEpoch, long maxEpoch, Fold<TxnOperation, Void, List<Future<T>>> 
fold, BiFunction<T, T, T> reduce)
+    {
+        List<Future<T>> futures = foldl(select, scope, minEpoch, maxEpoch, 
fold, operation, null, ArrayList::new);
+        if (futures == null)
+            return null;
+        return reduce(futures, reduce);
+    }
+
+    public <T> T mapReduce(TxnOperation operation, Key key, long minEpoch, 
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, ShardedRanges::shard, key, minEpoch, 
maxEpoch, mapReduceFold(map), reduce);
+    }
+
+    public <T> T mapReduce(TxnOperation operation, Key key, long epoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, key, epoch, epoch, map, reduce);
+    }
+
+    public <T> T mapReduceSince(TxnOperation operation, Key key, long epoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    {
+        return mapReduce(operation, key, epoch, Long.MAX_VALUE, map, reduce);
+    }
 
-    public void forEach(Consumer<CommandStore> forEach)
+    public <T> T mapReduce(TxnOperation operation, Keys keys, long minEpoch, 
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
     {
-        forEach((s, i, min, max) -> s.all(), null, 0, 0, forEach);
+        // probably need to split txnOperation and scope stuff here
+        return mapReduce(operation, ShardedRanges::shards, keys, minEpoch, 
maxEpoch, mapReduceFold(map), reduce);
     }
 
-    public void forEach(Keys keys, long epoch, Consumer<CommandStore> forEach)
+    public void setup(Consumer<CommandStore> forEach)
     {
-        forEach(keys, epoch, epoch, forEach);
+        setup(forEach, (store, f, i, t) -> { t.add(store.processSetup(f)); 
return t; }, (Void i1, Void i2) -> null);
     }
 
-    public void forEach(Keys keys, long minEpoch, long maxEpoch, 
Consumer<CommandStore> forEach)
+    public <T> T setup(Function<CommandStore, T> map, BiFunction<T, T, T> 
reduce)
     {
-        forEach(ShardedRanges::shards, keys, minEpoch, maxEpoch, forEach);
+        return setup(map, (store, f, i, t) -> { t.add(store.processSetup(f)); 
return t; }, reduce);
     }
 
-    public <T> T mapReduce(Keys keys, long epoch, Function<CommandStore, T> 
map, BiFunction<T, T, T> reduce)
+    private static Fold<TxnOperation, Void, List<Future<Void>>> 
forEachFold(Consumer<CommandStore> forEach)
     {
-        return mapReduce(keys, epoch, epoch, map, reduce);
+        return (store, op, i, t) -> { t.add(store.process(op, forEach)); 
return t; };
     }
 
-    public <T> T mapReduce(Keys keys, long minEpoch, long maxEpoch, 
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+    public void forEach(TxnOperation operation, Keys keys, long minEpoch, long 
maxEpoch, Consumer<CommandStore> forEach)

Review Comment:
   I think we should rename `TxnOperation` to something like `PreLoadContext` 
to indicate its purpose, which is to ensure the necessary state is ready for 
the operation before it executes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to