ifesdjeen commented on code in PR #254:
URL: https://github.com/apache/cassandra-accord/pull/254#discussion_r2355737953


##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -367,19 +369,22 @@ protected void updatedRedundantBefore(SafeCommandStore 
safeStore, RedundantBefor
             }
         }
         TxnId clearProgressLogBefore = 
unsafeGetRedundantBefore().minShardAndLocallyAppliedBefore();
-        List<TxnId> clearing = ((DefaultProgressLog) 
progressLog).activeBefore(clearProgressLogBefore);
-        for (TxnId txnId : clearing)
-        {
-            GlobalCommand globalCommand = commands.get(txnId);
-            Invariants.require(globalCommand != null && 
!globalCommand.isEmpty());
-            Command command = globalCommand.value();
-            StoreParticipants participants = 
command.participants().filter(LOAD, safeStore, txnId, 
command.executeAtIfKnown());
-            Cleanup cleanup = Cleanup.shouldCleanup(FULL, txnId, 
command.executeAtIfKnown(), command.saveStatus(), command.durability(), 
participants, unsafeGetRedundantBefore(), durableBefore());
-            Invariants.require(command.hasBeen(Applied)
-                               || cleanup.compareTo(Cleanup.TRUNCATE) >= 0
-                               || (durableBefore().min(txnId) != Universal &&
-                                      ((command.participants().stillExecutes() 
!= null && command.participants().stillExecutes().isEmpty())
-                                      || 
!Route.isFullRoute(command.route()))));
+        if (progressLog instanceof DefaultProgressLog)

Review Comment:
   Should we maybe at least warn otherwise? That said, probably this is fine 
since that's used for tests only for now.



##########
accord-core/src/main/java/accord/local/Bootstrap.java:
##########
@@ -273,7 +273,7 @@ public void fail(Ranges ranges, Throwable failure)
 
             store.agent().onFailedBootstrap(attempt, "PartialFetch", 
newFailures, () -> {
                 node.scheduler().selfRecurring(() -> {
-                    store.execute((PreLoadContext.Empty) () -> "Restart 
Bootstrap", safeStore -> restart(safeStore, newFailures.slice(allValid, 
Minimal), attempt + 1), store.agent());
+                    store.execute((PreLoadContext.Empty) () -> "Restart 
Bootstrap", safeStore -> { restart(safeStore, newFailures.slice(allValid, 
Minimal), attempt + 1); }, store.agent());

Review Comment:
   Noticed we have to disambiguate between Consumer and Function by adding 
brackets, but at least in Accord the Function overload is never used. Should we 
keep it? 



##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -71,75 +73,81 @@ public class CoordinateSyncPoint<R> extends 
CoordinatePreAccept<R>
 
     final CoordinationAdapter<R> adapter;
 
-    private CoordinateSyncPoint(Node node, SequentialAsyncExecutor executor, 
TxnId txnId, Topologies topologies, Txn txn, FullRoute<?> route, 
SyncPointAdapter<R> adapter, BiConsumer<R, Throwable> callback)
+    private CoordinateSyncPoint(Node node, SequentialAsyncExecutor executor, 
TxnId txnId, Topologies topologies, Txn txn, FullRoute<?> route, 
SyncPointAdapter<R> adapter, BiConsumer<? super R, Throwable> callback)
     {
         super(node, executor, txnId, txn, route, topologies, 
adapter.preacceptTrackerFactory, callback);
         this.adapter = adapter;
     }
 
-    public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
exclusive(Node node, Unseekables<U> keysOrRanges)
+    public static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
exclusive(Node node, Unseekables<U> keysOrRanges)
     {
         return coordinate(node, ExclusiveSyncPoint, keysOrRanges, 
Adapters.exclusiveSyncPoint());
     }
 
-    public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
exclusive(Node node, TxnId txnId, Unseekables<U> keysOrRanges)
+    public static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
exclusive(Node node, TxnId txnId, Unseekables<U> keysOrRanges)
     {
         return coordinate(node, txnId, keysOrRanges, 
Adapters.exclusiveSyncPoint());
     }
 
-    public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
exclusive(Node node, TxnId txnId, FullRoute<U> route)
+    public static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
exclusive(Node node, TxnId txnId, FullRoute<U> route)
     {
         return coordinate(node, txnId, route, Adapters.exclusiveSyncPoint());
     }
 
-    public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
executeAtQuorum(Node node, Unseekables<U> keysOrRanges)
+    public static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
executeAtQuorum(Node node, Unseekables<U> keysOrRanges)
     {
         return coordinate(node, ExclusiveSyncPoint, keysOrRanges, 
Adapters.exclusiveSyncPoint());
     }
 
-    public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
executeAtQuorum(Node node, TxnId txnId, FullRoute<U> route)
+    public static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
executeAtQuorum(Node node, TxnId txnId, FullRoute<U> route)
     {
         return coordinate(node, txnId, route, Adapters.exclusiveSyncPoint());
     }
 
-    public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
coordinate(Node node, Txn.Kind kind, Unseekables<U> keysOrRanges, 
SyncPointAdapter<SyncPoint<U>> adapter)
+    public static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
coordinate(Node node, Txn.Kind kind, Unseekables<U> keysOrRanges, 
SyncPointAdapter<SyncPoint<U>> adapter)
     {
         Invariants.requireArgument(kind.isSyncPoint());
         TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, 
keysOrRanges.domain(), cardinality(keysOrRanges));
-        return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, 
txnId, keysOrRanges, adapter)).beginAsResult();
+        return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, 
txnId, keysOrRanges, adapter));
     }
 
-    public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
coordinate(Node node, Txn.Kind kind, FullRoute<U> route, 
SyncPointAdapter<SyncPoint<U>> adapter)
+    public static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
coordinate(Node node, Txn.Kind kind, FullRoute<U> route, 
SyncPointAdapter<SyncPoint<U>> adapter)
     {
         Invariants.requireArgument(kind.isSyncPoint());
         TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, route.domain(), 
cardinality(route));
-        return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, 
txnId, route, adapter)).beginAsResult();
+        return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, 
txnId, route, adapter));
     }
 
-    private static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
coordinate(Node node, TxnId txnId, Unseekables<U> keysOrRanges, 
SyncPointAdapter<SyncPoint<U>> adapter)
+    private static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
coordinate(Node node, TxnId txnId, Unseekables<U> keysOrRanges, 
SyncPointAdapter<SyncPoint<U>> adapter)
     {
         Invariants.requireArgument(txnId.isSyncPoint());
         FullRoute<U> route = (FullRoute<U>) node.computeRoute(txnId, 
keysOrRanges);
         return coordinate(node, txnId, route, adapter);
     }
 
-    private static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
coordinate(Node node, TxnId txnId, FullRoute<U> route, 
SyncPointAdapter<SyncPoint<U>> adapter)
+    private static <U extends Unseekable> AsyncChain<SyncPoint<U>> 
coordinate(Node node, TxnId txnId, FullRoute<U> route, 
SyncPointAdapter<SyncPoint<U>> adapter)
     {
-        Invariants.requireArgument(txnId.isSyncPoint());
-        TopologyMismatch mismatch = 
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
 txnId, route.homeKey(), route);
-        if (mismatch != null)
-            return AsyncResults.failure(mismatch);
-
         try
         {
-            SettableByCallback<SyncPoint<U>> result = new 
SettableByCallback<>();
-            CoordinateSyncPoint<SyncPoint<U>> coordinate = new 
CoordinateSyncPoint<>(node, node.someSequentialExecutor(), txnId, 
adapter.forDecision(node, route, SHARE, txnId, txnId), 
node.agent().emptySystemTxn(txnId.kind(), txnId.domain()), route, adapter, 
result);
-            coordinate.start();
-            return result;
+            Invariants.requireArgument(txnId.isSyncPoint());
+            TopologyMismatch mismatch = 
TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()),
 txnId, route.homeKey(), route);
+            if (mismatch != null)
+                throw mismatch;
+
+            return new AsyncChains.Head<>()

Review Comment:
   Are there any advantages of returning `Head` here over just returning a 
completed async chain, i.e. `AsyncChains.success(null)?



##########
accord-core/src/main/java/accord/utils/async/AsyncResult.java:
##########
@@ -18,38 +18,127 @@
 
 package accord.utils.async;
 
+import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import javax.annotation.Nullable;
 
+import accord.api.AsyncExecutor;
+
 import static accord.utils.Invariants.illegalState;
+import static accord.utils.async.AsyncCallbacks.ifSuccess;
 
 /**
  * Handle for async computations that supports multiple listeners and 
registering
  * listeners after the computation has started
  *
  * TODO (expected): by default AsyncResult methods should be started 
immediately; should introduce newChain() for building a chain.
  */
-public interface AsyncResult<V> extends AsyncChain<V>
+public interface AsyncResult<V>
 {
-    @Override
-    AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback);
-
     boolean isDone();
     boolean isSuccess();
+    AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback);
+
+    // TODO (expected): see how many calls to this method we can avoid
+    default AsyncChain<V> chain()
+    {
+        return new AsyncChains.Head<>()
+        {
+            @Override
+            protected Cancellable start(BiConsumer<? super V, Throwable> 
callback)
+            {
+                AsyncResult.this.invoke(callback);
+                return null;
+            }
+        };
+    }
+
+    // runs immediately if already done, otherwise submits to the provided 
executor
+    default AsyncChain<V> chainImmediatelyElse(@Nullable AsyncExecutor 
executor)
+    {
+        AsyncChain<V> result = chain();
+        if (!isDone())
+            result = result.withExecutor(executor);
+        return result;
+    }
+
+    default <T> AsyncResult<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return chain().<T>map(mapper).beginAsResult();
+    }
+
+    default <T> AsyncResult<T> map(Function<? super V, ? extends T> mapper, 
AsyncExecutor executor)
+    {
+        return chain().<T>map(mapper, executor).beginAsResult();
+    }
+
+    default <T> AsyncResult<T> flatMap(Function<? super V, ? extends 
AsyncResult<T>> mapper)
+    {
+        return 
chain().flatMap(mapper.andThen(AsyncResult::chain)).beginAsResult();
+    }
+
+    default <T> AsyncResult<T> flatMap(Function<? super V, ? extends 
AsyncResult<T>> mapper, AsyncExecutor executor)
+    {
+        return chain().flatMap(mapper.andThen(AsyncResult::chain), 
executor).beginAsResult();
+    }
+
+    /**
+     * When the chain has failed, this allows the chain to attempt to recover 
if possible.  The provided function may return a {@code null} to represent
+     * that recovery was not possible and that the original exception should 
propgate.
+     * <p/>
+     * This is similiar to {@link 
java.util.concurrent.CompletableFuture#exceptionally(Function)} but with async 
handling; would have the same semantics as the following

Review Comment:
   nit: similar



##########
accord-core/src/main/java/accord/utils/async/AsyncResult.java:
##########
@@ -18,38 +18,127 @@
 
 package accord.utils.async;
 
+import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import javax.annotation.Nullable;
 
+import accord.api.AsyncExecutor;
+
 import static accord.utils.Invariants.illegalState;
+import static accord.utils.async.AsyncCallbacks.ifSuccess;
 
 /**
  * Handle for async computations that supports multiple listeners and 
registering
  * listeners after the computation has started
  *
  * TODO (expected): by default AsyncResult methods should be started 
immediately; should introduce newChain() for building a chain.
  */
-public interface AsyncResult<V> extends AsyncChain<V>
+public interface AsyncResult<V>
 {
-    @Override
-    AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback);
-
     boolean isDone();
     boolean isSuccess();
+    AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback);
+
+    // TODO (expected): see how many calls to this method we can avoid
+    default AsyncChain<V> chain()
+    {
+        return new AsyncChains.Head<>()
+        {
+            @Override
+            protected Cancellable start(BiConsumer<? super V, Throwable> 
callback)
+            {
+                AsyncResult.this.invoke(callback);
+                return null;
+            }
+        };
+    }
+
+    // runs immediately if already done, otherwise submits to the provided 
executor
+    default AsyncChain<V> chainImmediatelyElse(@Nullable AsyncExecutor 
executor)
+    {
+        AsyncChain<V> result = chain();
+        if (!isDone())
+            result = result.withExecutor(executor);
+        return result;
+    }
+
+    default <T> AsyncResult<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return chain().<T>map(mapper).beginAsResult();
+    }
+
+    default <T> AsyncResult<T> map(Function<? super V, ? extends T> mapper, 
AsyncExecutor executor)
+    {
+        return chain().<T>map(mapper, executor).beginAsResult();
+    }
+
+    default <T> AsyncResult<T> flatMap(Function<? super V, ? extends 
AsyncResult<T>> mapper)
+    {
+        return 
chain().flatMap(mapper.andThen(AsyncResult::chain)).beginAsResult();
+    }
+
+    default <T> AsyncResult<T> flatMap(Function<? super V, ? extends 
AsyncResult<T>> mapper, AsyncExecutor executor)
+    {
+        return chain().flatMap(mapper.andThen(AsyncResult::chain), 
executor).beginAsResult();
+    }
+
+    /**
+     * When the chain has failed, this allows the chain to attempt to recover 
if possible.  The provided function may return a {@code null} to represent
+     * that recovery was not possible and that the original exception should 
propgate.

Review Comment:
   nit: propgate



##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -1062,10 +1064,10 @@ public synchronized Supplier<EpochReady> 
updateTopology(Node node, Topology newT
             return () -> {
                 EpochReady ready = update.bootstrap.get();
                 return new EpochReady(ready.epoch,
-                                      flush.flatMap(ignore -> 
ready.metadata).beginAsResult(),
-                                      flush.flatMap(ignore -> 
ready.coordinate).beginAsResult(),
-                                      flush.flatMap(ignore -> 
ready.data).beginAsResult(),
-                                      flush.flatMap(ignore -> 
ready.reads).beginAsResult());
+                                      flush.flatMap(ignore -> ready.metadata),

Review Comment:
   This is nice!



##########
accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java:
##########
@@ -331,7 +306,7 @@ protected Cancellable start(BiConsumer<? super Integer, 
Throwable> callback)
         topLevel.add(() -> {
             AsyncResult.Settable<Integer> settable = AsyncResults.settable();
             settable.setSuccess(42);
-            return settable;
+            return settable.chain();

Review Comment:
   `Settable` does not implement `AsyncChain`, only `AsyncResult`



##########
accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java:
##########
@@ -267,13 +242,13 @@ protected Cancellable start(BiConsumer<? super Integer, 
Throwable> callback) {
                      .map(i -> i + 4)
                      .map(i -> i + 5);
 
-        Assertions.assertEquals(15, AsyncChains.getBlocking(chain));
+        Assertions.assertEquals(15, AsyncChainUtils.getBlocking(chain));
     }
 
     private static void assertCombinerSize(int size, AsyncChain<?> chain)
     {
-        Assertions.assertTrue(chain instanceof AsyncChains.ReducingAsyncChain, 
() -> String.format("%s is not an instance of AsyncChainCombiner", chain));
-        AsyncChains.ReducingAsyncChain<?> combiner = 
(AsyncChains.ReducingAsyncChain<?>) chain;
+        Assertions.assertTrue(chain instanceof 
AsyncChains.AccumulatingReducer, () -> String.format("%s is not an instance of 
AsyncChainCombiner", chain));

Review Comment:
   nit: renamed to "AsyncChains.AccumulatingReducer" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to