belliottsmith commented on code in PR #38:
URL: https://github.com/apache/cassandra-accord/pull/38#discussion_r1176860716


##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -331,6 +331,62 @@ public static <V> AsyncChain<V> failure(Throwable failure)
         return new Immediate<>(failure);
     }
 
+    public static <V, T> AsyncChain<T> map(AsyncChain<V> chain, Function<? 
super V, ? extends T> mapper, Executor executor)
+    {
+        return chain.flatMap(v -> new Head<T>()
+        {
+            @Override
+            protected void start(BiConsumer<? super T, Throwable> callback)
+            {
+                try
+                {
+                    executor.execute(() -> {
+                        try
+                        {
+                            callback.accept(mapper.apply(v), null);
+                        }
+                        catch (Throwable t)
+                        {
+                            callback.accept(null, t);
+                        }
+                    });
+                }
+                catch (Throwable t)
+                {
+                    callback.accept(null, t);
+                }
+            }
+        });
+    }
+
+    public static <V, T> AsyncChain<T> flatMap(AsyncChain<V> chain, Function<? 
super V, ? extends AsyncChain<T>> mapper, Executor executor)
+    {
+        return chain.flatMap(v -> new Head<T>()
+        {
+            @Override
+            protected void start(BiConsumer<? super T, Throwable> callback)
+            {
+                try
+                {
+                    executor.execute(() -> {
+                        try
+                        {
+                            mapper.apply(v).addCallback(callback);
+                        }
+                        catch (Throwable t)
+                        {
+                            callback.accept(null, t);
+                        }
+                    });
+                }
+                catch (Throwable t)
+                {
+                    callback.accept(null, t);

Review Comment:
   This is a good point. I think we can probably fix this by using our own 
Executor interface on which you can submit mutually-exclusive executions in 
case the executor were providing mutual exclusion. i.e., even if the executor 
is shutdown, it may permit waiting until it is terminated before taking a mutex 
and then invoking some continuation.
   
   But let's tag that as a low priority follow-up ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to