belliottsmith commented on code in PR #38:
URL: https://github.com/apache/cassandra-accord/pull/38#discussion_r1176978658
##########
accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java:
##########
@@ -18,26 +18,97 @@
package accord.impl.basic;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.impl.InMemoryCommandStore;
import accord.impl.InMemoryCommandStores;
+import accord.local.CommandStore;
import accord.local.CommandStores;
import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
import accord.local.ShardDistributor;
import accord.utils.RandomSource;
+import accord.utils.async.AsyncChain;
public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
{
- private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore
store, ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory, SimulatedDelayedExecutorService executorService)
+ private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore
store, RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService
executorService)
+ {
+ super(time, agent, store, random, shardDistributor,
progressLogFactory, DelayedCommandStore.factory(executorService));
+ }
+
+ public static CommandStores.Factory factory(PendingQueue pending)
{
- super(time, agent, store, shardDistributor, progressLogFactory,
InMemoryCommandStore.SingleThread.factory(executorService));
+ return (time, agent, store, random, shardDistributor,
progressLogFactory) ->
+ new DelayedCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, new
SimulatedDelayedExecutorService(pending, random));
}
- public static CommandStores.Factory factory(PendingQueue pending,
RandomSource random)
+ public static class DelayedCommandStore extends InMemoryCommandStore
{
- SimulatedDelayedExecutorService executorService = new
SimulatedDelayedExecutorService(pending, random);
- return (time, agent, store, shardDistributor, progressLogFactory) ->
new DelayedCommandStores(time, agent, store, shardDistributor,
progressLogFactory, executorService);
+ private final SimulatedDelayedExecutorService executor;
+ private TaskExecutorService.Task<?> previous = null;
+
+ public DelayedCommandStore(int id, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder
rangesForEpochHolder, SimulatedDelayedExecutorService executor)
+ {
+ super(id, time, agent, store, progressLogFactory,
rangesForEpochHolder);
+ this.executor = executor;
+ }
+
+ private static CommandStore.Factory
factory(SimulatedDelayedExecutorService executor)
+ {
+ return (id, time, agent, store, progressLogFactory,
rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store,
progressLogFactory, rangesForEpoch, executor);
+ }
+
+ @Override
+ public boolean inStore()
+ {
+ return CommandStore.Unsafe.maybeCurrent() == this;
+ }
+
+ @Override
+ public AsyncChain<Void> execute(PreLoadContext context, Consumer<?
super SafeCommandStore> consumer)
+ {
+ return submit(context, i -> { consumer.accept(i); return null; });
+ }
+
+ @Override
+ public <T> AsyncChain<T> submit(PreLoadContext context, Function<?
super SafeCommandStore, T> function)
+ {
+ return submit(() -> executeInContext(this, context, function));
+ }
+
+ @Override
+ public <T> AsyncChain<T> submit(Callable<T> fn)
+ {
+ TaskExecutorService.Task<T> task = new
TaskExecutorService.Task<>(() -> Unsafe.runWith(this, fn));
+ task.addCallback(agent()); // used to track unexpected exceptions
and notify simulations
+ if (previous == null || previous.isDone())
+ {
+ executor.execute(task);
+ }
+ else
+ {
+ // SimulatedDelayedExecutorService can interleave tasks and is
global; this violates a requirement for
+ // CommandStore; single threaded with ordered execution! To
simulate this behavior, add the callback
+ // to enqueue once the "previous" task completes.
+ long nowMilis = executor.nowMillis();
Review Comment:
typo
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]