belliottsmith commented on code in PR #38:
URL: https://github.com/apache/cassandra-accord/pull/38#discussion_r1176978658


##########
accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java:
##########
@@ -18,26 +18,97 @@
 
 package accord.impl.basic;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.ProgressLog;
 import accord.impl.InMemoryCommandStore;
 import accord.impl.InMemoryCommandStores;
+import accord.local.CommandStore;
 import accord.local.CommandStores;
 import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
 import accord.local.ShardDistributor;
 import accord.utils.RandomSource;
+import accord.utils.async.AsyncChain;
 
 public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
 {
-    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, SimulatedDelayedExecutorService executorService)
+    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService 
executorService)
+    {
+        super(time, agent, store, random, shardDistributor, 
progressLogFactory, DelayedCommandStore.factory(executorService));
+    }
+
+    public static CommandStores.Factory factory(PendingQueue pending)
     {
-        super(time, agent, store, shardDistributor, progressLogFactory, 
InMemoryCommandStore.SingleThread.factory(executorService));
+        return (time, agent, store, random, shardDistributor, 
progressLogFactory) ->
+               new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, new 
SimulatedDelayedExecutorService(pending, random));
     }
 
-    public static CommandStores.Factory factory(PendingQueue pending, 
RandomSource random)
+    public static class DelayedCommandStore extends InMemoryCommandStore
     {
-        SimulatedDelayedExecutorService executorService = new 
SimulatedDelayedExecutorService(pending, random);
-        return (time, agent, store, shardDistributor, progressLogFactory) -> 
new DelayedCommandStores(time, agent, store, shardDistributor, 
progressLogFactory, executorService);
+        private final SimulatedDelayedExecutorService executor;
+        private TaskExecutorService.Task<?> previous = null;
+
+        public DelayedCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder 
rangesForEpochHolder, SimulatedDelayedExecutorService executor)
+        {
+            super(id, time, agent, store, progressLogFactory, 
rangesForEpochHolder);
+            this.executor = executor;
+        }
+
+        private static CommandStore.Factory 
factory(SimulatedDelayedExecutorService executor)
+        {
+            return (id, time, agent, store, progressLogFactory, 
rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store, 
progressLogFactory, rangesForEpoch, executor);
+        }
+
+        @Override
+        public boolean inStore()
+        {
+            return CommandStore.Unsafe.maybeCurrent() == this;
+        }
+
+        @Override
+        public AsyncChain<Void> execute(PreLoadContext context, Consumer<? 
super SafeCommandStore> consumer)
+        {
+            return submit(context, i -> { consumer.accept(i); return null; });
+        }
+
+        @Override
+        public <T> AsyncChain<T> submit(PreLoadContext context, Function<? 
super SafeCommandStore, T> function)
+        {
+            return submit(() -> executeInContext(this, context, function));
+        }
+
+        @Override
+        public <T> AsyncChain<T> submit(Callable<T> fn)
+        {
+            TaskExecutorService.Task<T> task = new 
TaskExecutorService.Task<>(() -> Unsafe.runWith(this, fn));
+            task.addCallback(agent()); // used to track unexpected exceptions 
and notify simulations
+            if (previous == null || previous.isDone())
+            {
+                executor.execute(task);
+            }
+            else
+            {
+                // SimulatedDelayedExecutorService can interleave tasks and is 
global; this violates a requirement for
+                // CommandStore; single threaded with ordered execution!  To 
simulate this behavior, add the callback
+                // to enqueue once the "previous" task completes.
+                long nowMilis = executor.nowMillis();

Review Comment:
   typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to