dcapwell commented on code in PR #2057:
URL: https://github.com/apache/cassandra/pull/2057#discussion_r1053800181


##########
src/java/org/apache/cassandra/service/accord/AccordCommand.java:
##########
@@ -652,59 +654,66 @@ private boolean canApplyWithCurrentScope(SafeCommandStore 
safeStore)
         return true;
     }
 
-    private Future<Void> applyWithCorrectScope(CommandStore unsafeStore)
+    private AsyncResult<Void> applyWithCorrectScope(CommandStore unsafeStore)

Review Comment:
   The usage is `AsyncChain`, we can simplify to be
   
   ```
   private AsyncChain<Void> applyWithCorrectScope(CommandStore unsafeStore)
       {
           TxnId txnId = txnId();
           return unsafeStore.submit(this, safeStore -> {
               AccordCommand command = (AccordCommand) safeStore.command(txnId);
               return command.applyChain(safeStore, false);
           }).flatMap(a -> a);
       }
   ```
   
   I don't like `.flatMap(a -> a);` but feel this is sadly a java thing... we 
can have `.flatMap(AsyncChains.flatten())` but that's more verbose...  
   
   If this returns a `AsyncResult` because we are trying to control execution 
order, then can just add `.beginAsResult()` to the end of the `flatMap`



##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -208,6 +221,14 @@ public void run()
         }
     }
 
+    @Override
+    public void begin(BiConsumer<? super R, Throwable> callback)

Review Comment:
   Here is a quick test to show this example
   
   ```
   @Test
       void beginMultipleTimes()
       {
           final AtomicBoolean called = new AtomicBoolean(false);
           Supplier<Integer> task = () -> {
               if (!called.compareAndSet(false, true))
                   throw new AssertionError("Attempted to invoke multiple 
times");
               return 5;
           };
           // randomly generate different possible cases where we could take a 
callable
           Gen<AsyncChain<Integer>> gen = rnd -> {
               called.set(false);
               if (rnd.nextBoolean())
                   return 
AsyncChains.ofCallable(MoreExecutors.directExecutor(), () -> task.get());
               return new AsyncChains.Head<>() {
                   @Override
                   public void begin(BiConsumer<? super Integer, Throwable> 
callback) {
                       callback.accept(task.get(), null);
                   }
               };
           };
           qt().forAll(gen).check(chain -> {
               Assertions.assertEquals(5, Result.from(chain).get());
   
               chain = chain.map(i -> i + 1);
   
               AsyncChain<String> a = chain.map(Object::toString);
   
               Assertions.assertEquals("6", Result.from(a).get());
               Assertions.assertEquals(7, 
Result.from(a.map(Integer::parseInt).map(i -> i + 1)).get());
           });
       }
   ```
   
   this fails with the stack trace
   
   ```
   Caused by: java.lang.AssertionError: Attempted to invoke multiple times
        at 
accord.utils.async.AsyncChainsTest.lambda$test$4(AsyncChainsTest.java:101)
        at 
accord.utils.async.AsyncChainsTest.lambda$test$5(AsyncChainsTest.java:107)
        at 
accord.utils.async.AsyncChains.lambda$encapsulate$0(AsyncChains.java:272)
        at 
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
        at accord.utils.async.AsyncChains$1.begin(AsyncChains.java:314)
        at accord.utils.async.AsyncChains$Head.begin(AsyncChains.java:96)
        at accord.utils.async.AsyncChains$Link.begin(AsyncChains.java:121)
        at accord.utils.async.AsyncChains$Map.begin(AsyncChains.java:125)
        at 
accord.utils.async.AsyncChainsTest$Result.from(AsyncChainsTest.java:151)
   ```
   
   Now, if you comment out the first statement `Assertions.assertEquals(5, 
Result.from(chain).get());` then the test fails as you attempt to `map` after 
you extracted the result, this is handled here
   
   ```
   Caused by: java.lang.IllegalStateException
        at 
com.google.common.base.Preconditions.checkState(Preconditions.java:491)
        at accord.utils.async.AsyncChains.add(AsyncChains.java:260)
        at accord.utils.async.AsyncChains.map(AsyncChains.java:232)
        at 
accord.utils.async.AsyncChainsTest.lambda$beginMultipleTimes$7(AsyncChainsTest.java:107)
   ```
   
   which is this check: `Preconditions.checkState(next instanceof Head<?>);`



##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -208,6 +221,14 @@ public void run()
         }
     }
 
+    @Override
+    public void begin(BiConsumer<? super R, Throwable> callback)

Review Comment:
   `AsyncChains.Head` doesn't have exactly-once semantics, which can put us 
into a situation where we double invoke.  Before this patch 
`org.apache.cassandra.service.accord.AccordCommandStore#submit` controlled this 
behavior by always returning the result (Future), but now any caller to 
`submit` will get a lazy `AsyncChain` and if those callers touch `begin` twice 
this could become an issue.
   
   I think most usages of `AsyncChains.Head` would benefit from adding 
exactly-once semantics.



##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -32,11 +33,12 @@
 import accord.local.SafeCommandStore;
 import accord.primitives.Seekables;
 import accord.primitives.TxnId;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;

Review Comment:
   unused import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to