dcapwell commented on code in PR #17:
URL: https://github.com/apache/cassandra-accord/pull/17#discussion_r1051209580


##########
accord-core/src/main/java/accord/utils/async/AsyncResults.java:
##########
@@ -0,0 +1,295 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+public class AsyncResults
+{
+    private AsyncResults() {}
+
+    private static class Result<V>
+    {
+        final V value;
+        final Throwable failure;
+
+        public Result(V value, Throwable failure)
+        {
+            this.value = value;
+            this.failure = failure;
+        }
+    }
+
+    static class AbstractResult<V> implements AsyncResult<V>
+    {
+        private static final AtomicReferenceFieldUpdater<AbstractResult, 
Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, 
Object.class, "state");
+
+        private volatile Object state;
+
+        private static class Listener<V>
+        {
+            final BiConsumer<? super V, Throwable> callback;
+            Listener<V> next;
+
+            public Listener(BiConsumer<? super V, Throwable> callback)
+            {
+                this.callback = callback;
+            }
+        }
+
+        private void notify(Listener<V> listener, Result<V> result)
+        {
+            while (listener != null)
+            {
+                listener.callback.accept(result.value, result.failure);

Review Comment:
   With `Future` if the user function throws an exception, are able to set that 
as the future's result.  This logic doesn't seem to handle exceptions so 
`boolean trySetResult(V result, Throwable failure)` may lead to a failure on 
the producer side.



##########
accord-core/src/main/java/accord/utils/async/AsyncResult.java:
##########
@@ -0,0 +1,96 @@
+package accord.utils.async;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+/**
+ * Handle for async computations that supports multiple listeners and 
registering
+ * listeners after the computation has started
+ */
+public interface AsyncResult<V>
+{
+    void listen(BiConsumer<? super V, Throwable> callback);
+
+    default void listen(BiConsumer<? super V, Throwable> callback, Executor 
executor)
+    {
+        listen(AsyncCallbacks.inExecutor(callback, executor));
+    }
+
+    default void listen(Runnable runnable)
+    {
+        listen((unused, failure) -> {
+            if (failure == null) runnable.run();
+            else throw new RuntimeException(failure);
+        });
+    }
+
+    default void listen(Runnable runnable, Executor executor)
+    {
+        listen(AsyncCallbacks.inExecutor(runnable, executor));
+    }

Review Comment:
   consistency comment: can we rename to `addCallback`?  `AsyncChain` and C* 
`Future` use `addCallback`, so would be good be match



##########
accord-core/src/main/java/accord/utils/async/AsyncResult.java:
##########
@@ -0,0 +1,96 @@
+package accord.utils.async;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+/**
+ * Handle for async computations that supports multiple listeners and 
registering
+ * listeners after the computation has started
+ */
+public interface AsyncResult<V>

Review Comment:
   We spoke in slack but I feel that we should add `map` and `flatMap` here as 
well, so you are not forced to use the chain for single hop cases (which are 
100% of the cases in Accord).
   
   IMO we should replace `.toChain().map(...).complete()` with `.map()` and 
replace `.flatMap(...).flatMap(...).map(...)` with 
`.toChain().flatMap(...).flatMap(...).map(...).complete()`; AKA when you know 
you are a "chain" of calls, you call `toChain` to lower the cost.
   
   I made this change in a feedback branch I can post, but the map/flatMap 
methods are
   
   ```
   // feedback branch made sure null exeuctor works
   default <T> AsyncResult<T> map(Function<? super V, ? extends T> mapper, 
Executor executor)
       {
           AsyncResult.Settable<T> settable = AsyncResults.settable();
           listen((success, failure) -> {
               if (failure != null)
               {
                   settable.setFailure(failure);
                   return;
               }
               try
               {
                   T result = mapper.apply(success);
                   settable.setSuccess(result);
               }
               catch (Throwable t)
               {
                   settable.setFailure(t);
               }
           }, executor);
           return settable;
       }
   
       default <T> AsyncResult<T> map(Function<? super V, ? extends T> mapper)
       {
           return map(mapper, null);
       }
   
       default <T> AsyncResult<T> flatMap(Function<? super V, ? extends 
AsyncResult<T>> mapper, Executor executor)
       {
           AsyncResult.Settable<T> settable = AsyncResults.settable();
           listen((success, failure) -> {
              if (failure != null)
              {
                  settable.setFailure(failure);
                  return;
              }
              try
              {
                  AsyncResult<T> next = mapper.apply(success);
                  next.listen((s2, f2) -> {
                      if (f2 != null)
                      {
                          settable.tryFailure(f2);
                          return;
                      }
                      settable.trySuccess(s2);
                  });
              }
              catch (Throwable t)
              {
                  settable.setFailure(t);
              }
           }, executor);
           return settable;
       }
   
       default <T> AsyncResult<T> flatMap(Function<? super V, ? extends 
AsyncResult<T>> mapper)
       {
           return flatMap(mapper, null);
       }
   ```
   
   I am totally cool with the trampoline (`AsyncChain`) but since the majority 
of cases are single hop we add a slightly higher cost than the raw `map` and 
`flatMap` methods (you can't avoid listening CAS, and the output Result has the 
same cost, so only difference is the cost to run the chain) I don't think we 
should default to use chain until we have at least 2 hops



##########
accord-core/src/main/java/accord/utils/async/AsyncResults.java:
##########
@@ -0,0 +1,295 @@
+package accord.utils.async;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+public class AsyncResults
+{
+    private AsyncResults() {}
+
+    private static class Result<V>
+    {
+        final V value;
+        final Throwable failure;
+
+        public Result(V value, Throwable failure)
+        {
+            this.value = value;
+            this.failure = failure;
+        }
+    }
+
+    static class AbstractResult<V> implements AsyncResult<V>
+    {
+        private static final AtomicReferenceFieldUpdater<AbstractResult, 
Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, 
Object.class, "state");
+
+        private volatile Object state;
+
+        private static class Listener<V>
+        {
+            final BiConsumer<? super V, Throwable> callback;
+            Listener<V> next;
+
+            public Listener(BiConsumer<? super V, Throwable> callback)
+            {
+                this.callback = callback;
+            }
+        }
+
+        private void notify(Listener<V> listener, Result<V> result)
+        {
+            while (listener != null)
+            {
+                listener.callback.accept(result.value, result.failure);

Review Comment:
   Looking closer these `Listeners` are more closely to 
`org.apache.cassandra.utils.concurrent.ListenerList.CallbackLambdaListener#run` 
which isn't explicit about the handling, so in both cases we depend on 
`java.lang.Thread#getUncaughtExceptionHandler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to