belliottsmith commented on code in PR #7:
URL: https://github.com/apache/cassandra-accord/pull/7#discussion_r1012120167
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStores.java:
##########
@@ -18,93 +18,64 @@
package accord.impl;
-import accord.api.Agent;
-import accord.api.DataStore;
-import accord.api.ProgressLog;
-import accord.local.CommandStore;
-import accord.local.CommandStores;
-import accord.local.Node;
-import accord.primitives.Keys;
+import accord.api.*;
+import accord.local.*;
+import accord.primitives.AbstractKeys;
+import accord.utils.MapReduce;
-import java.util.function.Consumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
-import static java.lang.Boolean.FALSE;
-
-public abstract class InMemoryCommandStores extends CommandStores
+public class InMemoryCommandStores
{
- public InMemoryCommandStores(int num, Node node, Agent agent, DataStore
store,
- ProgressLog.Factory progressLogFactory)
- {
- super(num, node, agent, store, progressLogFactory);
- }
-
- public static InMemoryCommandStores inMemory(Node node)
- {
- return (InMemoryCommandStores) node.commandStores();
- }
-
- public void forEachLocal(Consumer<? super CommandStore> forEach)
- {
- foldl((ranges, o, minEpoch, maxEpoch) -> ranges.all(),
- null, Long.MIN_VALUE, Long.MAX_VALUE,
- (store, f, r, t) -> { f.accept(store); return null; }, forEach,
null, ignore -> FALSE);
- }
-
- public void forEachLocal(Keys keys, long minEpoch, long maxEpoch,
Consumer<? super CommandStore> forEach)
- {
- foldl(ShardedRanges::shards, keys, minEpoch, maxEpoch, (store, f, r,
t) -> { f.accept(store); return null; }, forEach, null, ignore -> FALSE);
- }
-
- public void forEachLocal(Keys keys, long epoch, Consumer<? super
CommandStore> forEach)
- {
- forEachLocal(keys, epoch, epoch, forEach);
- }
-
- public void forEachLocalSince(Keys keys, long epoch, Consumer<? super
CommandStore> forEach)
- {
- forEachLocal(keys, epoch, Long.MAX_VALUE, forEach);
- }
-
- public static class Synchronized extends InMemoryCommandStores
+ public static class Synchronized extends SyncCommandStores
{
public Synchronized(int num, Node node, Agent agent, DataStore store,
ProgressLog.Factory progressLogFactory)
{
- super(num, node, agent, store, progressLogFactory);
+ super(num, node, agent, store, progressLogFactory,
InMemoryCommandStore.Synchronized::new);
}
- @Override
- protected CommandStore createCommandStore(int generation, int index,
int numShards, Node node, Agent agent, DataStore store, ProgressLog.Factory
progressLogFactory, CommandStore.RangesForEpoch rangesForEpoch)
+ public <T> T mapReduce(PreLoadContext context, AbstractKeys<?, ?>
keys, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, T> map)
{
- return new InMemoryCommandStore.Synchronized(generation, index,
numShards, node::uniqueNow, node.topology()::epoch, agent, store,
progressLogFactory, rangesForEpoch);
+ return super.mapReduce(context, keys, minEpoch, maxEpoch, map,
SyncMapReduceAdapter.instance());
+ }
+
+ public <T> T mapReduce(PreLoadContext context, AbstractKeys<?, ?>
keys, long minEpoch, long maxEpoch, Function<? super SafeCommandStore, T> map,
BiFunction<T, T, T> reduce)
+ {
+ return mapReduce(context, keys, minEpoch, maxEpoch, new
MapReduce<SafeCommandStore, T>() {
+ @Override
+ public T apply(SafeCommandStore in)
+ {
+ return map.apply(in);
+ }
+
+ @Override
+ public T reduce(T o1, T o2)
+ {
+ return reduce.apply(o1, o2);
+ }
+ });
}
}
- public static class SingleThread extends InMemoryCommandStores
+ public static class SingleThread extends AsyncCommandStores
{
- public SingleThread(int num, Node node, Agent agent, DataStore store,
ProgressLog.Factory progressLogFactory)
+ public SingleThread(int num, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory)
{
- super(num, node, agent, store, progressLogFactory);
+ super(num, time, agent, store, progressLogFactory,
InMemoryCommandStore.SingleThread::new);
}
- @Override
- protected CommandStore createCommandStore(int generation, int index,
int numShards, Node node, Agent agent, DataStore store, ProgressLog.Factory
progressLogFactory, CommandStore.RangesForEpoch rangesForEpoch)
+ public SingleThread(int num, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.Factory
shardFactory)
{
- return new InMemoryCommandStore.SingleThread(generation, index,
numShards, node.id(), node::uniqueNow, node.topology()::epoch, agent, store,
progressLogFactory, rangesForEpoch);
+ super(num, time, agent, store, progressLogFactory, shardFactory);
}
}
public static class Debug extends InMemoryCommandStores.SingleThread
Review Comment:
Right, but there in order to debug I assume
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]